import urllib f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz") data_file = "./kddcup.data_10_percent.gz" raw_data = sc.textFile(data_file) import numpy as np def parse_interaction(line): line_split = line.split(",") # keep just numeric and logical values symbolic_indexes = [1,2,3,41] clean_line_split = [item for i,item in enumerate(line_split) if i not in symbolic_indexes] return np.array([float(x) for x in clean_line_split]) vector_data = raw_data.map(parse_interaction) from pyspark.mllib.stat import Statistics from math import sqrt # Compute column summary statistics. summary = Statistics.colStats(vector_data) print "Duration Statistics:" print " Mean: {}".format(round(summary.mean()[0],3)) print " St. deviation: {}".format(round(sqrt(summary.variance()[0]),3)) print " Max value: {}".format(round(summary.max()[0],3)) print " Min value: {}".format(round(summary.min()[0],3)) print " Total value count: {}".format(summary.count()) print " Number of non-zero values: {}".format(summary.numNonzeros()[0]) def parse_interaction_with_key(line): line_split = line.split(",") # keep just numeric and logical values symbolic_indexes = [1,2,3,41] clean_line_split = [item for i,item in enumerate(line_split) if i not in symbolic_indexes] return (line_split[41], np.array([float(x) for x in clean_line_split])) label_vector_data = raw_data.map(parse_interaction_with_key) normal_label_data = label_vector_data.filter(lambda x: x[0]=="normal.") normal_summary = Statistics.colStats(normal_label_data.values()) print "Duration Statistics for label: {}".format("normal") print " Mean: {}".format(normal_summary.mean()[0],3) print " St. deviation: {}".format(round(sqrt(normal_summary.variance()[0]),3)) print " Max value: {}".format(round(normal_summary.max()[0],3)) print " Min value: {}".format(round(normal_summary.min()[0],3)) print " Total value count: {}".format(normal_summary.count()) print " Number of non-zero values: {}".format(normal_summary.numNonzeros()[0]) def summary_by_label(raw_data, label): label_vector_data = raw_data.map(parse_interaction_with_key).filter(lambda x: x[0]==label) return Statistics.colStats(label_vector_data.values()) normal_sum = summary_by_label(raw_data, "normal.") print "Duration Statistics for label: {}".format("normal") print " Mean: {}".format(normal_sum.mean()[0],3) print " St. deviation: {}".format(round(sqrt(normal_sum.variance()[0]),3)) print " Max value: {}".format(round(normal_sum.max()[0],3)) print " Min value: {}".format(round(normal_sum.min()[0],3)) print " Total value count: {}".format(normal_sum.count()) print " Number of non-zero values: {}".format(normal_sum.numNonzeros()[0]) guess_passwd_summary = summary_by_label(raw_data, "guess_passwd.") print "Duration Statistics for label: {}".format("guess_password") print " Mean: {}".format(guess_passwd_summary.mean()[0],3) print " St. deviation: {}".format(round(sqrt(guess_passwd_summary.variance()[0]),3)) print " Max value: {}".format(round(guess_passwd_summary.max()[0],3)) print " Min value: {}".format(round(guess_passwd_summary.min()[0],3)) print " Total value count: {}".format(guess_passwd_summary.count()) print " Number of non-zero values: {}".format(guess_passwd_summary.numNonzeros()[0]) label_list = ["back.","buffer_overflow.","ftp_write.","guess_passwd.", "imap.","ipsweep.","land.","loadmodule.","multihop.", "neptune.","nmap.","normal.","perl.","phf.","pod.","portsweep.", "rootkit.","satan.","smurf.","spy.","teardrop.","warezclient.", "warezmaster."] stats_by_label = [(label, summary_by_label(raw_data, label)) for label in label_list] duration_by_label = [ (stat[0], np.array([float(stat[1].mean()[0]), float(sqrt(stat[1].variance()[0])), float(stat[1].min()[0]), float(stat[1].max()[0]), int(stat[1].count())])) for stat in stats_by_label] import pandas as pd pd.set_option('display.max_columns', 50) stats_by_label_df = pd.DataFrame.from_items(duration_by_label, columns=["Mean", "Std Dev", "Min", "Max", "Count"], orient='index') print "Duration statistics, by label" stats_by_label_df def get_variable_stats_df(stats_by_label, column_i): column_stats_by_label = [ (stat[0], np.array([float(stat[1].mean()[column_i]), float(sqrt(stat[1].variance()[column_i])), float(stat[1].min()[column_i]), float(stat[1].max()[column_i]), int(stat[1].count())])) for stat in stats_by_label ] return pd.DataFrame.from_items(column_stats_by_label, columns=["Mean", "Std Dev", "Min", "Max", "Count"], orient='index') get_variable_stats_df(stats_by_label,0) print "src_bytes statistics, by label" get_variable_stats_df(stats_by_label,1) from pyspark.mllib.stat import Statistics correlation_matrix = Statistics.corr(vector_data, method="spearman") import pandas as pd pd.set_option('display.max_columns', 50) col_names = ["duration","src_bytes","dst_bytes","land","wrong_fragment", "urgent","hot","num_failed_logins","logged_in","num_compromised", "root_shell","su_attempted","num_root","num_file_creations", "num_shells","num_access_files","num_outbound_cmds", "is_hot_login","is_guest_login","count","srv_count","serror_rate", "srv_serror_rate","rerror_rate","srv_rerror_rate","same_srv_rate", "diff_srv_rate","srv_diff_host_rate","dst_host_count","dst_host_srv_count", "dst_host_same_srv_rate","dst_host_diff_srv_rate","dst_host_same_src_port_rate", "dst_host_srv_diff_host_rate","dst_host_serror_rate","dst_host_srv_serror_rate", "dst_host_rerror_rate","dst_host_srv_rerror_rate"] corr_df = pd.DataFrame(correlation_matrix, index=col_names, columns=col_names) corr_df # get a boolean dataframe where true means that a pair of variables is highly correlated highly_correlated_df = (abs(corr_df) > .8) & (corr_df < 1.0) # get the names of the variables so we can use them to slice the dataframe correlated_vars_index = (highly_correlated_df==True).any() correlated_var_names = correlated_vars_index[correlated_vars_index==True].index # slice it highly_correlated_df.loc[correlated_var_names,correlated_var_names]