After completing the setup here, this is what an example IPython Notebook session could look like.
You will first notice that when a notebook first connects to the IPython kernel, the notebook server will spit out all the usual PySpark output about initializing Spark.
First start by seeing that there does exist a SparkContext
object in the sc
variable:
print sc
<pyspark.context.SparkContext object at 0x26e7990>
Now let's load an RDD with some interesting data. We have the GDELT event data set on our cluster as a collection of tab-delimited text files:
raw_events = sc.textFile('/user/laserson/gdelt/events').map(lambda x: x.split('\t'))
# raw_events.cache()
Note how we've cached the event data in distributed memory.
Let's see what an object in the RDD looks like
print raw_events.first()
[u'0', u'19790101', u'197901', u'1979', u'1979.0027', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'AFR', u'AFRICA', u'AFR', u'', u'', u'', u'', u'', u'', u'', u'1', u'040', u'040', u'04', u'1', u'1', u'9', u'1', u'9', u'5.52631578947368', u'', u'', u'', u'', u'0', u'0', u'0', u'', u'', u'', u'', u'0', u'0', u'0', u'', u'', u'', u'', u'0', u'0', u'', u'20130203']
Let's count the number of events we have. You can follow the progress of the computation using the Spark application web server.
print raw_events.count()
61260109
We see that we have about 60 million events at our disposal.
The GDELT event data set collects geopolitical events that occur around the world. Each event is tagged with a Goldstein scale value that measures the potential for the event to destabilize the country. Let's compute and plot a histogram of the Goldstein scale values across all the events in the database. The Goldstein scale value is present in the 31st field.
First, let's make sure that plotting images are set to be displayed inline (see the IPython docs):
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
Now we'll just confirm that all the Goldstein values are indeed between -10 and 10.
m = raw_events.map(lambda x: float(x[30])).min()
M = raw_events.map(lambda x: float(x[30])).max()
print "Min is %f\nMax is %f" % (m, M)
Min is -10.000000 Max is 10.000000
Here we compute the histogram. This is slightly hacky, but c'est la vie:
bins = np.arange(-10, 11)
one_hot = raw_events.map(lambda x: float(x[30])).map(lambda x: np.histogram([x], bins=bins)[0])
hist = one_hot.reduce(lambda x, y: x + y)
print hist
[ 4315780 562319 516529 938546 100780 3890452 1816766 0 4678160 1290723 6450944 10235827 4893120 6673389 4912603 1487392 1056329 5624014 1400922 415514]
Finally we plot the histogram
fig = plt.figure()
ax = fig.add_subplot(111)
t = ax.bar(bins[:-1], hist)
We can also plot the number of events each day for the 5 countries that have the most events in the year 2001.
First we can see the number of unique countries that are available. Note that we filter out events that don't list a country code.
raw_events.filter(lambda x: x[7] != '').map(lambda x: x[7]).distinct().collect()
[u'', u'SAS', u'NMR', u'IDN', u'ALB', u'KHM', u'TUN', u'COK', u'SHN', u'THA', u'ESP', u'BLZ', u'IND', u'CAS', u'TON', u'COM', u'NAF', u'UKR', u'TGO', u'AZE', u'COL', u'MEX', u'NOR', u'BGR', u'LTU', u'AFR', u'SAU', u'CRI', u'TKM', u'EUR', u'DJI', u'NER', u'LAM', u'WLF', u'JOR', u'CAU', u'FJI', u'PRK', u'MMR', u'LAO', u'EAF', u'BLR', u'LBN', u'BEN', u'TTO', u'YEM', u'GEO', u'STP', u'PRT', u'HKG', u'LSO', u'KAZ', u'FRA', u'UGA', u'BEL', u'BTN', u'NAM', u'AIA', u'GBR', u'DMA', u'SVK', u'KIR', u'BWA', u'CAN', u'IRQ', u'GHA', u'BFA', u'CYM', u'DNK', u'AUS', u'VAT', u'BLK', u'PNG', u'MDA', u'MAR', u'MLI', u'ABW', u'DZA', u'MDG', u'HND', u'AUT', u'PGS', u'VUT', u'MYS', u'VNM', u'PER', u'SLV', u'JPN', u'BMU', u'ZAF', u'TMP', u'PRY', u'CAF', u'NIC', u'SUR', u'BRN', u'NGA', u'EGY', u'HTI', u'JAM', u'CZE', u'LKA', u'PLW', u'SOM', u'BOL', u'AND', u'IRL', u'KEN', u'ISR', u'ARE', u'CHE', u'NLD', u'SDN', u'IRN', u'MLT', u'HRV', u'ARG', u'SRB', u'ATG', u'GTM', u'ZWE', u'BHS', u'MAC', u'ETH', u'TJK', u'UZB', u'BHR', u'KNA', u'MDV', u'WSM', u'CHL', u'SWE', u'QAT', u'MOZ', u'WAF', u'SMR', u'ARM', u'SLE', u'RUS', u'BRB', u'USA', u'POL', u'CHN', u'GRD', u'SEA', u'ASA', u'BRA', u'GNQ', u'MNG', u'SYC', u'LVA', u'MEA', u'LUX', u'NPL', u'GRC', u'SEN', u'TCD', u'SAM', u'SLB', u'ISL', u'LBR', u'MKD', u'SWZ', u'DOM', u'FIN', u'SGP', u'KWT', u'CMR', u'WST', u'MWI', u'MUS', u'AFG', u'LBY', u'TZA', u'BDI', u'VCT', u'AGO', u'GIN', u'SAF', u'URY', u'CUB', u'DEU', u'MHL', u'NRU', u'CPV', u'TUR', u'CYP', u'LCA', u'EEU', u'ZMB', u'GNB', u'BGD', u'VEN', u'LIE', u'ROM', u'GAB', u'GMB', u'RWA', u'ECU', u'HUN', u'TUV', u'PAK', u'ITA', u'PHL', u'MRT', u'CRB', u'MCO', u'OMN', u'FSM', u'SYR', u'CIV', u'ERI', u'COD', u'GUY', u'COG', u'PAN', u'SCN', u'KGZ', u'PSE', u'EST', u'KOR', u'NZL']
Here we convert each event into counts. We will aggregate by country and day, for all events in the year 2001.
events_2001 = raw_events.filter(lambda x: x[7] != '').filter(lambda x: x[1] >= '20010101').filter(lambda x: x[1] <= '20011231')
events_2001.cache()
PythonRDD[12] at RDD at PythonRDD.scala:40
events_2001.count()
3162516
country_day_counts = events_2001.filter(lambda x: x[7] != '').map(lambda x: ((x[7], x[1]), 1)).reduceByKey(lambda x, y: x + y)
country_day_counts.cache()
PythonRDD[22] at RDD at PythonRDD.scala:40
country_day_counts.take(5)
[((u'VEN', u'20010602'), 12), ((u'GHA', u'20011224'), 15), ((u'BWA', u'20011224'), 2), ((u'PHL', u'20010223'), 63), ((u'IRN', u'20010428'), 186)]
from dateutil.parser import parse as parse_date
epoch = parse_date('19700101')
def td2s(td):
return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 1000000) / 1e6
def day2unix(day):
return td2s(parse_date(day) - epoch)
country_series = country_day_counts \
.map(lambda x: (x[0][0], (day2unix(x[0][1]), x[1]))) \
.groupByKey() \
.map(lambda x: (x[0], sorted(x[1], key=lambda y: y[0]))) \
.map(lambda x: (x[0], zip(*x[1]))) \
.map(lambda x: (x[0], x[1][0], x[1][1]))
country_series.cache()
PythonRDD[77] at RDD at PythonRDD.scala:40
country_series.first()
(u'NMR', (978307200.0, 978393600.0, 978480000.0, 978652800.0, 978739200.0, 978825600.0, 978912000.0, 978998400.0, 979084800.0, 979689600.0, 980121600.0, 980294400.0, 980467200.0, 980640000.0, 980726400.0, 980812800.0, 980899200.0, 981331200.0, 981417600.0, 981504000.0, 981590400.0, 981676800.0, 981849600.0, 982108800.0, 982195200.0, 982281600.0, 982368000.0, 982540800.0, 982713600.0, 982800000.0, 982886400.0, 982972800.0, 983059200.0, 983232000.0, 983750400.0, 983836800.0, 983923200.0, 984096000.0, 984182400.0, 984355200.0, 984441600.0, 984528000.0, 984873600.0, 985132800.0, 985219200.0, 985392000.0, 985478400.0, 985564800.0, 985651200.0, 985737600.0, 985910400.0, 986169600.0, 986342400.0, 986515200.0, 986774400.0, 986947200.0, 987292800.0, 987379200.0, 987465600.0, 987552000.0, 987638400.0, 987724800.0, 987811200.0, 987897600.0, 987984000.0, 988156800.0, 988329600.0, 988675200.0, 988848000.0, 989020800.0, 989193600.0, 989366400.0, 989625600.0, 989884800.0, 989971200.0, 990057600.0, 990316800.0, 990403200.0, 990489600.0, 990576000.0, 990662400.0, 990748800.0, 990835200.0, 991094400.0, 991180800.0, 991267200.0, 991353600.0, 991699200.0, 991872000.0, 991958400.0, 992217600.0, 992304000.0, 992390400.0, 992476800.0, 992995200.0, 993081600.0, 993340800.0, 993513600.0, 993600000.0, 993686400.0, 994032000.0, 994118400.0, 994204800.0, 994291200.0, 994550400.0, 994636800.0, 994809600.0, 994896000.0, 995414400.0, 995500800.0, 995587200.0, 995846400.0, 996019200.0, 996192000.0, 996364800.0, 996451200.0, 996710400.0, 996796800.0, 997142400.0, 997228800.0, 997401600.0, 997574400.0, 997660800.0, 997747200.0, 997833600.0, 998438400.0, 998524800.0, 998956800.0, 999043200.0, 999216000.0, 999388800.0, 999561600.0, 999648000.0, 999734400.0, 999820800.0, 1000080000.0, 1000252800.0, 1000425600.0, 1000598400.0, 1000684800.0, 1000857600.0, 1001030400.0, 1001116800.0, 1001289600.0, 1001548800.0, 1001635200.0, 1001808000.0, 1001980800.0, 1002067200.0, 1002153600.0, 1002240000.0, 1002412800.0, 1002499200.0, 1002585600.0, 1002672000.0, 1002758400.0, 1002844800.0, 1002931200.0, 1003104000.0, 1003190400.0, 1003363200.0, 1003622400.0, 1003708800.0, 1003795200.0, 1004054400.0, 1004140800.0, 1004227200.0, 1004313600.0, 1004745600.0, 1004918400.0, 1005004800.0, 1005091200.0, 1005436800.0, 1005523200.0, 1005696000.0, 1005782400.0, 1005868800.0, 1006128000.0, 1006214400.0, 1006300800.0, 1006905600.0, 1006992000.0, 1007164800.0, 1007251200.0, 1007424000.0, 1007769600.0, 1008028800.0, 1008115200.0, 1008201600.0, 1008460800.0, 1008547200.0, 1008633600.0, 1008720000.0, 1009238400.0, 1009411200.0, 1009584000.0, 1009670400.0), (1, 2, 1, 2, 1, 2, 4, 1, 5, 1, 1, 2, 3, 2, 1, 4, 2, 1, 3, 3, 5, 1, 3, 8, 7, 12, 2, 2, 11, 7, 6, 1, 2, 1, 2, 3, 1, 1, 2, 4, 2, 4, 1, 1, 2, 5, 2, 5, 3, 3, 5, 1, 4, 2, 1, 4, 2, 9, 19, 2, 10, 1, 4, 10, 3, 1, 1, 1, 2, 8, 3, 1, 1, 2, 3, 2, 1, 1, 2, 1, 3, 6, 1, 1, 1, 2, 1, 4, 3, 1, 1, 6, 4, 1, 4, 2, 1, 1, 3, 2, 4, 3, 2, 7, 2, 7, 2, 3, 4, 1, 3, 1, 1, 16, 1, 1, 3, 1, 2, 1, 1, 2, 2, 2, 4, 4, 3, 1, 1, 2, 1, 1, 7, 5, 2, 3, 7, 4, 8, 3, 3, 1, 1, 6, 2, 5, 2, 4, 4, 3, 1, 1, 1, 4, 1, 1, 2, 1, 4, 2, 1, 2, 4, 3, 2, 2, 3, 3, 2, 1, 4, 3, 3, 3, 2, 3, 1, 2, 2, 2, 1, 3, 2, 2, 1, 4, 3, 1, 1, 1, 1, 2, 1, 1, 7, 6, 4))
Now we're ready to plot. Each object in country_series
has the information we need to plot a single line.
random_color = lambda: '#%02x%02x%02x' % tuple(np.random.randint(0,256,3))
fig = plt.figure()
ax = fig.add_subplot(111)
for (country, times, events) in country_series.takeOrdered(10, lambda x: -sum(x[2])):
t = ax.plot(times, events, lw=1, c=random_color())
What's the big spike for the blue line above?
country_day_counts.reduce(lambda x, y: max(x, y, key=lambda z: z[1]))
((u'USA', u'20010912'), 2387)
Looks like it was the day after September 11th.