import urllib f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz", "kddcup.data.gz") data_file = "./kddcup.data.gz" raw_data = sc.textFile(data_file) print "Train data size is {}".format(raw_data.count()) ft = urllib.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/corrected.gz", "corrected.gz") test_data_file = "./corrected.gz" test_raw_data = sc.textFile(test_data_file) print "Test data size is {}".format(test_raw_data.count()) from pyspark.mllib.regression import LabeledPoint from numpy import array csv_data = raw_data.map(lambda x: x.split(",")) test_csv_data = test_raw_data.map(lambda x: x.split(",")) protocols = csv_data.map(lambda x: x[1]).distinct().collect() services = csv_data.map(lambda x: x[2]).distinct().collect() flags = csv_data.map(lambda x: x[3]).distinct().collect() def create_labeled_point(line_split): # leave_out = [41] clean_line_split = line_split[0:41] # convert protocol to numeric categorical variable try: clean_line_split[1] = protocols.index(clean_line_split[1]) except: clean_line_split[1] = len(protocols) # convert service to numeric categorical variable try: clean_line_split[2] = services.index(clean_line_split[2]) except: clean_line_split[2] = len(services) # convert flag to numeric categorical variable try: clean_line_split[3] = flags.index(clean_line_split[3]) except: clean_line_split[3] = len(flags) # convert label to binary label attack = 1.0 if line_split[41]=='normal.': attack = 0.0 return LabeledPoint(attack, array([float(x) for x in clean_line_split])) training_data = csv_data.map(create_labeled_point) test_data = test_csv_data.map(create_labeled_point) from pyspark.mllib.tree import DecisionTree, DecisionTreeModel from time import time # Build the model t0 = time() tree_model = DecisionTree.trainClassifier(training_data, numClasses=2, categoricalFeaturesInfo={1: len(protocols), 2: len(services), 3: len(flags)}, impurity='gini', maxDepth=4, maxBins=100) tt = time() - t0 print "Classifier trained in {} seconds".format(round(tt,3)) predictions = tree_model.predict(test_data.map(lambda p: p.features)) labels_and_preds = test_data.map(lambda p: p.label).zip(predictions) t0 = time() test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(test_data.count()) tt = time() - t0 print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4)) print "Learned classification tree model:" print tree_model.toDebugString() print "Service 0 is {}".format(services[0]) print "Service 52 is {}".format(services[52]) def create_labeled_point_minimal(line_split): # leave_out = [41] clean_line_split = line_split[3:4] + line_split[5:6] + line_split[22:23] # convert flag to numeric categorical variable try: clean_line_split[0] = flags.index(clean_line_split[0]) except: clean_line_split[0] = len(flags) # convert label to binary label attack = 1.0 if line_split[41]=='normal.': attack = 0.0 return LabeledPoint(attack, array([float(x) for x in clean_line_split])) training_data_minimal = csv_data.map(create_labeled_point_minimal) test_data_minimal = test_csv_data.map(create_labeled_point_minimal) # Build the model t0 = time() tree_model_minimal = DecisionTree.trainClassifier(training_data_minimal, numClasses=2, categoricalFeaturesInfo={0: len(flags)}, impurity='gini', maxDepth=3, maxBins=32) tt = time() - t0 print "Classifier trained in {} seconds".format(round(tt,3)) predictions_minimal = tree_model_minimal.predict(test_data_minimal.map(lambda p: p.features)) labels_and_preds_minimal = test_data_minimal.map(lambda p: p.label).zip(predictions_minimal) t0 = time() test_accuracy = labels_and_preds_minimal.filter(lambda (v, p): v == p).count() / float(test_data_minimal.count()) tt = time() - t0 print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))