In this 2nd part of the blog post, we continue to demonstrate how to build a predictive model with Hadoop, this time we'll use Spark and ML-Lib.
Apache Spark is a relatively new entrant to the Hadoop ecosystem. Running on YARN, Apache Spark is an in-memory data processing API and execution engine that is effective for machine learning and data science use cases along side other workloads.
In the context of our demo, we will show how to use Apache Spark via its PySpark API to generate our feature matrix and also use ML-Lib (Spark's machine learning library) to build and evaluate models.
Recall from part 1 that we are exploring a predictive model for flight delays. Our source dataset resides here: http://stat-computing.org/dataexpo/2009/the-data.html, and includes details about flights in the US from the years 1987-2008. We have also enriched the data with weather information from: http://www.ncdc.noaa.gov/cdo-web/datasets/, where we find daily temperatures (min/max), wind speed, snow conditions and precipitation.
We will build a supervised learning model to predict flight delays for flights leaving O'Hare International airport (ORD). We will use the year 2007 data to build the model, and test it's validity using data from 2008.
So let's begin.
Apache Spark's basic data abstraction is that of an RDD (resilient distributed dataset), which is a fault-tolerant collection of elements that can be operated on in parallel across your Hadoop cluster.
Spark's API (available in Scala, Python or Java) supports a variety of operations such as map() and flatMap(), filter(), join(), and more. For a full description of the API please check the Spark API programming guide: http://spark.apache.org/docs/1.1.0/programming-guide.html
We will show how to perform the same pre-processing with Spark (using its Python API - PySpark) as we did with PIG previously. First, let's define a few Python functions for feature generation:
from datetime import date
holidays = [
date(2007, 1, 1), date(2007, 1, 15), date(2007, 2, 19), date(2007, 5, 28), date(2007, 6, 7), date(2007, 7, 4), \
date(2007, 9, 3), date(2007, 10, 8), date(2007, 11, 11), date(2007, 11, 22), date(2007, 12, 25), \
date(2008, 1, 1), date(2008, 1, 21), date(2008, 2, 18), date(2008, 5, 22), date(2008, 5, 26), date(2008, 7, 4), \
date(2008, 9, 1), date(2008, 10, 13), date(2008, 11, 11), date(2008, 11, 27), date(2008, 12, 25) \
]
def get_hour(val): return(int(val.zfill(4)[:2]))
def days_from_nearest_holiday(year, month, day):
d = date(year, month, day)
x = [(abs(d-h)).days for h in holidays]
return min(x)
<console>:1: error: ';' expected but 'import' found. from datetime import date ^ <console>:2: error: illegal start of simple expression holidays = [ ^ <console>:9: error: identifier expected but 'val' found. def get_hour(val): return(int(val.zfill(4)[:2])) ^
Now we use Spark to create the simple feature matrices from iteration #1.
The python function gen_features(row) takes a row of input and generates a comma-separated string with all the features. preprocess_spark() performs the complete pre-processing task using Spark on a given file. In our case we need to do this for both the 2007 file (our training set) and 2008 file (our validation/test set).
import pydoop.hdfs as hdfs
fields = ["Year", "Month", "Day", "DayOfWeek", "DepTime", "CRSDepTime", "ArrTime", "CRSArrTime", "Carrier",
"FlightNum", "TailNum", "ActualElapsedTime", "CRSElapsedTime", "AirTime", "ArrDelay", "Delay",
"Origin", "Dest", "Distance", "TaxiIn", "TaxiOut", "Cancelled", "CancellationCode", "Diverted",
"CarrierDelay", "WeatherDelay", "NASDelay", "SecurityDelay", "LateAircraftDelay"]
def gen_features(row):
f1 = [row[fields.index(x)] for x in ['Delay', 'Month', 'Day', 'DayOfWeek']]
f2 = [get_hour(row[fields.index('DepTime')])]
f3 = [row[fields.index('Distance')]]
# f3 = [row[fields.index(x)] for x in ['Distance', 'Carrier', 'Dest']]
year = int(row[fields.index('Year')])
month = int(row[fields.index('Month')])
day = int(row[fields.index('Day')])
f4 = [days_from_nearest_holiday(year, month, day)]
flist = f1 + f2 + f3 + f4
return ','.join([str(x) for x in flist])
# function to do a preprocessing step for a given file
def preprocess_spark(infile):
lines = sc.textFile(infile)
header = ','.join(lines.take(1))
data = lines.filter(lambda line: line != header) \
.map(lambda line: line.split(",")) \
.filter(lambda vals: vals[fields.index('Cancelled')] == "0") \
.filter(lambda vals: vals[fields.index('Origin')] == 'ORD')
results = data.map(gen_features)
return results
data_2007 = preprocess_spark('airline/delay/2007.csv')
#outfile = 'airline/fm-spark/ord_2007_1'
#if hdfs.path.exists(outfile):
# hdfs.rmr(outfile)
#data_2007.saveAsTextFile(outfile)
#print data_2007.take(5)
data_2008 = preprocess_spark('airline/delay/2008.csv')
#outfile = 'airline/fm-spark/ord_2008_1'
#if hdfs.path.exists(outfile):
# hdfs.rmr(outfile)
#data_2008.saveAsTextFile(outfile)
['-8,1,25,4,10,719,10', '41,1,28,7,15,925,13', '45,1,29,1,20,316,14', '-9,1,17,3,18,719,2', '180,1,12,5,20,316,3']
Here we keep data_2007 and data_2008 as Spark RDDs. We could save them back to HDFS, but we can continue to use them directly as RDDs.
Now that we have the training and validation datasets ready, let's see how to build a predictive model with Spark's ML-Lib machine learning library.
MLlib is Spark’s scalable machine learning library, which includes various learning algorithms and utilities, including classification, regression, clustering, collaborative filtering, dimensionality reduction, and others.
If you compare ML-Lib to Scikit-learn, at the moment ML-Lib lacks a few important algorithms like Random Forest, Gradient Boosted Trees, and others. nevertheless, it's a strong library that runs natively on Spark, with a strong community behind it.
Let's try a few algorithms from ML-Lib on our dataset. First we parse our feature matrices into RDDs of LabeledPoint instances for both the training and test datasets. We also define "Eval_metrics" a helper function to compute precision, recall, F1 measure and accuracy.
from pyspark.mllib.regression import LabeledPoint
# Load and parse the data
def parsePoint(line):
values = [float(x) for x in line.split(',')]
return LabeledPoint(1 if values[0]>=15 else 0, values[1:])
# Evaluating the model's performance
def eval_metrics(labelsAndPreds):
tp = float(labelsAndPreds.filter(lambda r: r[0]==1 and r[1]==1).count())
tn = float(labelsAndPreds.filter(lambda r: r[0]==0 and r[1]==0).count())
fp = float(labelsAndPreds.filter(lambda r: r[0]==0 and r[1]==1).count())
fn = float(labelsAndPreds.filter(lambda r: r[0]==1 and r[1]==0).count())
print [tp, tn, fp, fn]
precision = tp / (tp+fp)
recall = tp / (tp+fn)
F_measure = 2*precision*recall / (precision+recall)
accuracy = (tp+tn) / (tp+tn+fp+fn)
return {'precision': precision, 'recall': recall, 'F1': F_measure, 'accuracy': accuracy}
# Prepare training set
##train_data = sc.textFile('airline/fm-spark/ord_2007_2')
parsedTrainData = data_2007.map(parsePoint)
# Prepare test set
##test_data = sc.textFile('airline/fm-spark/ord_2008_2')
parsedTestData = data_2008.map(parsePoint)
ML-Lib supports linear regression and its classification variants, implemented via Stochastic Gradient descent (SGD). Let's see how to use it:
from pyspark.mllib.classification import LogisticRegressionWithSGD
# Build the LR model
model_lr = LogisticRegressionWithSGD.train(parsedTrainData, iterations=100, regParam=25.0)
# Predict
labelsAndPreds = parsedTestData.map(lambda p: (p.label, model_lr.predict(p.features)))
m = eval_metrics(labelsAndPreds)
print ("precision = %0.2f, recall= %0.2f, F1 = %0.2f, accuracy = %0.2f" % (m['precision'], m['recall'], m['F1'], m['accuracy']))
[95436.0, 0.0, 239894.0, 0.0] precision = 0.28, recall= 1.00, F1 = 0.44, accuracy = 0.28
And now let's try SVM with SGD:
%%time
from pyspark.mllib.classification import SVMWithSGD
# Build the SVM model
model_svm = SVMWithSGD.train(parsedTrainData, iterations=100)
# Predict
labelsAndPreds = parsedTestData.map(lambda p: (p.label, model_svm.predict(p.features)))
m = eval_metrics(labelsAndPreds)
print ("precision = %0.2f, recall= %0.2f, F1 = %0.2f, accuracy = %0.2f" % (m['precision'], m['recall'], m['F1'], m['accuracy']))
[0.0, 239894.0, 0.0, 95436.0]
--------------------------------------------------------------------------- ZeroDivisionError Traceback (most recent call last) <ipython-input-45-8a675e5143bc> in <module>() ----> 1 get_ipython().run_cell_magic(u'time', u'', u'\nfrom pyspark.mllib.classification import SVMWithSGD\n\n# Build the SVM model\nmodel_svm = SVMWithSGD.train(parsedTrainData, iterations=100)\n\n# Predict\nlabelsAndPreds = parsedTestData.map(lambda p: (p.label, model_svm.predict(p.features)))\nm = eval_metrics(labelsAndPreds)\nprint ("precision = %0.2f, recall= %0.2f, F1 = %0.2f, accuracy = %0.2f" % (m[\'precision\'], m[\'recall\'], m[\'F1\'], m[\'accuracy\']))') /home/demo/pyenv/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in run_cell_magic(self, magic_name, line, cell) 2160 magic_arg_s = self.var_expand(line, stack_depth) 2161 with self.builtin_trap: -> 2162 result = fn(magic_arg_s, cell) 2163 return result 2164 /home/demo/pyenv/lib/python2.7/site-packages/IPython/core/magics/execution.pyc in time(self, line, cell, local_ns) /home/demo/pyenv/lib/python2.7/site-packages/IPython/core/magic.pyc in <lambda>(f, *a, **k) 191 # but it's overkill for just that one bit of state. 192 def magic_deco(arg): --> 193 call = lambda f, *a, **k: f(*a, **k) 194 195 if callable(arg): /home/demo/pyenv/lib/python2.7/site-packages/IPython/core/magics/execution.pyc in time(self, line, cell, local_ns) 1127 else: 1128 st = clock2() -> 1129 exec(code, glob, local_ns) 1130 end = clock2() 1131 out = None <timed exec> in <module>() <ipython-input-43-7e17306a3d02> in eval_metrics(labelsAndPreds) 15 print [tp, tn, fp, fn] 16 ---> 17 precision = tp / (tp+fp) 18 recall = tp / (tp+fn) 19 F_measure = 2*precision*recall / (precision+recall) ZeroDivisionError: float division by zero
TBD: don't stop here. Add weather and binary dummy variables
Recall that in part 1, we decided to enrich the dataset by integrating weather, and we've seen that modeling produces better results with the additional features. Let's implement this enhanced feature matrix generation as well here, again using Apache Spark.
Here we need to join the two datasets, by the date field.
%%time
delay_fields = [ "Year", "Month", "Day", "DayOfWeek", "DepTime", "CRSDepTime", "ArrTime", "CRSArrTime", "Carrier", \
"FlightNum", "TailNum", "ActualElapsedTime", "CRSElapsedTime", "AirTime", "ArrDelay", "Delay", \
"Origin", "Dest", "Distance", "TaxiIn", "TaxiOut", "Cancelled", "CancellationCode", "Diverted", \
"CarrierDelay", "WeatherDelay", "NASDelay", "SecurityDelay", "LateAircraftDelay" ]
year_inx = delay_fields.index('Year')
month_inx = delay_fields.index('Month')
day_inx = delay_fields.index('Day')
weather_fields = [ "station", "date", "metric", "value", "t1", "t2", "t3", "t4" ]
date_inx = weather_fields.index('date')
metric_inx = weather_fields.index('metric')
value_inx = weather_fields.index('value')
def gen_features(row):
f1 = [row[delay_fields.index(x)] for x in ['Delay', 'Year', 'Month', 'Day', 'DayOfWeek']]
f2 = [get_hour(row[delay_fields.index('DepTime')])]
f3 = [row[delay_fields.index('Distance')]]
year = int(row[year_inx])
month = int(row[month_inx])
day = int(row[day_inx])
f4 = [is_bad_carrier(row[delay_fields.index('Carrier')]), is_bad_dest(row[delay_fields.index('Dest')]), \
days_from_nearest_holiday(year, month, day)]
flist = f1 + f2 + f3 + f4
return (to_date(year, month, day), flist)
def to_date(year, month, day):
s = "%04d%02d%02d" % (year, month, day)
return s
# function to do a preprocessing step for a given file
def preprocess_spark(delay_file, weather_file):
# Read airline delay dataset
lines = sc.textFile(delay_file)
header = ','.join(lines.take(1))
data = lines.filter(lambda line: line != header) \
.map(lambda line: line.split(",")) \
.filter(lambda vals: vals[delay_fields.index('Cancelled')] == "0") \
.filter(lambda vals: vals[delay_fields.index('Origin')] == 'ORD')
res_delay = data.map(gen_features)
# Read weather data into RDDs
lines = sc.textFile(weather_file)
data = lines.map(lambda line: line.split(",")) \
.filter(lambda vals: vals[weather_fields.index('station')] == 'USW00094846')
w_tmin = data.filter(lambda vals: vals[metric_inx] == 'TMIN') \
.map(lambda vals: (vals[date_inx], vals[value_inx]))
w_tmax = data.filter(lambda vals: vals[metric_inx] == 'TMAX') \
.map(lambda vals: (vals[date_inx], vals[value_inx]))
w_prcp = data.filter(lambda vals: vals[metric_inx] == 'PRCP') \
.map(lambda vals: (vals[date_inx], vals[value_inx]))
w_snow = data.filter(lambda vals: vals[metric_inx] == 'SNOW') \
.map(lambda vals: (vals[date_inx], vals[value_inx]))
w_awnd = data.filter(lambda vals: vals[metric_inx] == 'AWND') \
.map(lambda vals: (vals[date_inx], vals[value_inx]))
# Join weather data with delay data
joined = res_delay.join(w_tmin).mapValues(lambda x: x[0]+[x[1]]) \
.join(w_tmax).mapValues(lambda x: x[0]+[x[1]]) \
.join(w_prcp).mapValues(lambda x: x[0]+[x[1]]) \
.join(w_snow).mapValues(lambda x: x[0]+[x[1]]) \
.join(w_awnd).mapValues(lambda x: x[0]+[x[1]]) \
.map(lambda pair: ','.join([str(x) for x in pair[1]]))
return joined
data_2007 = preprocess_spark('airline/delay/2007.csv', 'airline/weather/2007.csv')
outfile = 'airline/fm-spark/ord_2007_2'
if hdfs.path.exists(outfile):
hdfs.rmr(outfile)
data_2007.saveAsTextFile(outfile)
print "Finished pre-processing for 2007 datasets"
data_2008 = preprocess_spark('airline/delay/2008.csv', 'airline/weather/2008.csv')
outfile = 'airline/fm-spark/ord_2008_2'
if hdfs.path.exists(outfile):
hdfs.rmr(outfile)
data_2008.saveAsTextFile(outfile)
print "Finished pre-processing for 2008 datasets"
Now that we have the training and validation datasets ready, let's see how to build a predictive model with Spark's ML-Lib machine learning library.
MLlib is Spark’s scalable machine learning library, which includes various learning algorithms and utilities, including classification, regression, clustering, collaborative filtering, dimensionality reduction, and others.
If you compare ML-Lib to Scikit-learn, at the moment ML-Lib lacks a few important algorithms like Random Forest, Gradient Boosted Trees, and others. But nonetheless it's a strong library with a bright future.
Let's run a few predictive models with ML-Lib. First we parse our feature matrices into RDDs of LabeledPoint instances for both the training and test datasets. We also define "Eval_metrics" a helper function to compute precision, recall, F1 measure and accuracy.
from pyspark.mllib.regression import LabeledPoint
# Load and parse the data
def parsePoint(line):
values = [float(x) for x in line.split(',')]
return LabeledPoint(1 if values[0]>=15 else 0, [float(x) for x in values[1:]])
# Evaluating the model's performance
def eval_metrics(labelsAndPreds):
tp = float(labelsAndPreds.filter(lambda r: r[0]==1 and r[1]==1).count())
tn = float(labelsAndPreds.filter(lambda r: r[0]==0 and r[1]==0).count())
fp = float(labelsAndPreds.filter(lambda r: r[0]==0 and r[1]==1).count())
fn = float(labelsAndPreds.filter(lambda r: r[0]==1 and r[1]==0).count())
precision = tp / (tp+fp)
recall = tp / (tp+fn)
F_measure = 2*precision*recall / (precision+recall)
accuracy = (tp+tn) / (tp+tn+fp+fn)
return {'precision': precision, 'recall': recall, 'F1': F_measure, 'accuracy': accuracy}
# Prepare training set
train_data = sc.textFile('airline/fm-spark/ord_2007_2')
parsedTrainData = train_data.map(parsePoint)
# Prepare test set
test_data = sc.textFile('airline/fm-spark/ord_2008_2')
parsedTestData = test_data.map(parsePoint)
ML-Lib supports linear regression and its classification variants, implemented via Stochastic Gradient descent (SGD). Let's see how to use it:
%%time
from pyspark.mllib.classification import LogisticRegressionWithSGD
# Build the LR model
model_lr = LogisticRegressionWithSGD.train(parsedTrainData, iterations=100)
# Predict
labelsAndPreds = parsedTestData.map(lambda p: (p.label, model_lr.predict(p.features)))
m = eval_metrics(labelsAndPreds)
print ("precision = %0.2f, recall= %0.2f, F1 = %0.2f, accuracy = %0.2f" % (m['precision'], m['recall'], m['F1'], m['accuracy']))
precision = 0.35, recall= 0.55, F1 = 0.43, accuracy = 0.59 CPU times: user 515 ms, sys: 216 ms, total: 731 ms Wall time: 1min 2s
And now let's try the Support Vector Machine version of ML-Lib:
%%time
from pyspark.mllib.classification import SVMWithSGD
# Build the SVM model
model_svm = SVMWithSGD.train(parsedTrainData, iterations=200)
# Predict
labelsAndPreds = parsedTestData.map(lambda p: (p.label, model_svm.predict(p.features)))
m = eval_metrics(labelsAndPreds)
print ("precision = %0.2f, recall= %0.2f, F1 = %0.2f, accuracy = %0.2f" % (m['precision'], m['recall'], m['F1'], m['accuracy']))
precision = 0.28, recall= 1.00, F1 = 0.44, accuracy = 0.28 CPU times: user 876 ms, sys: 393 ms, total: 1.27 s Wall time: 1min 21s