import urllib f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz") data_file = "./kddcup.data_10_percent.gz" raw_data = sc.textFile(data_file) csv_data = raw_data.map(lambda x: x.split(",")) key_value_data = csv_data.map(lambda x: (x[41], x)) # x[41] contains the network interaction tag key_value_data.take(1) key_value_duration = csv_data.map(lambda x: (x[41], float(x[0]))) durations_by_key = key_value_duration.reduceByKey(lambda x, y: x + y) durations_by_key.collect() counts_by_key = key_value_data.countByKey() counts_by_key sum_counts = key_value_duration.combineByKey( (lambda x: (x, 1)), # the initial value, with value x and count 1 (lambda acc, value: (acc[0]+value, acc[1]+1)), # how to combine a pair value with the accumulator: sum value, and increment count (lambda acc1, acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1])) # combine accumulators ) sum_counts.collectAsMap() duration_means_by_type = sum_counts.map(lambda (key,value): (key, round(value[0]/value[1],3))).collectAsMap() # Print them sorted for tag in sorted(duration_means_by_type, key=duration_means_by_type.get, reverse=True): print tag, duration_means_by_type[tag]