import urllib f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz") data_file = "./kddcup.data_10_percent.gz" raw_data = sc.textFile(data_file) # parse data csv_data = raw_data.map(lambda x: x.split(",")) # separate into different RDDs normal_csv_data = csv_data.filter(lambda x: x[41]=="normal.") attack_csv_data = csv_data.filter(lambda x: x[41]!="normal.") normal_duration_data = normal_csv_data.map(lambda x: int(x[0])) attack_duration_data = attack_csv_data.map(lambda x: int(x[0])) total_normal_duration = normal_duration_data.reduce(lambda x, y: x + y) total_attack_duration = attack_duration_data.reduce(lambda x, y: x + y) print "Total duration for 'normal' interactions is {}".\ format(total_normal_duration) print "Total duration for 'attack' interactions is {}".\ format(total_attack_duration) normal_count = normal_duration_data.count() attack_count = attack_duration_data.count() print "Mean duration for 'normal' interactions is {}".\ format(round(total_normal_duration/float(normal_count),3)) print "Mean duration for 'attack' interactions is {}".\ format(round(total_attack_duration/float(attack_count),3)) normal_sum_count = normal_duration_data.aggregate( (0,0), # the initial value (lambda acc, value: (acc[0] + value, acc[1] + 1)), # combine value with acc (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) # combine accumulators ) print "Mean duration for 'normal' interactions is {}".\ format(round(normal_sum_count[0]/float(normal_sum_count[1]),3)) attack_sum_count = attack_duration_data.aggregate( (0,0), # the initial value (lambda acc, value: (acc[0] + value, acc[1] + 1)), # combine value with acc (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) # combine accumulators ) print "Mean duration for 'attack' interactions is {}".\ format(round(attack_sum_count[0]/float(attack_sum_count[1]),3))