Example IPython Notebook running with PySpark

After completing the setup here, this is what an example IPython Notebook session could look like.

You will first notice that when a notebook first connects to the IPython kernel, the notebook server will spit out all the usual PySpark output about initializing Spark.

First start by seeing that there does exist a SparkContext object in the sc variable:

In [1]:
print sc
<pyspark.context.SparkContext object at 0x26e7990>

Now let's load an RDD with some interesting data. We have the GDELT event data set on our cluster as a collection of tab-delimited text files:

In [2]:
raw_events = sc.textFile('/user/laserson/gdelt/events').map(lambda x: x.split('\t'))
# raw_events.cache()

Note how we've cached the event data in distributed memory.

Let's see what an object in the RDD looks like

In [7]:
print raw_events.first()
[u'0', u'19790101', u'197901', u'1979', u'1979.0027', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'AFR', u'AFRICA', u'AFR', u'', u'', u'', u'', u'', u'', u'', u'1', u'040', u'040', u'04', u'1', u'1', u'9', u'1', u'9', u'5.52631578947368', u'', u'', u'', u'', u'0', u'0', u'0', u'', u'', u'', u'', u'0', u'0', u'0', u'', u'', u'', u'', u'0', u'0', u'', u'20130203']

Let's count the number of events we have. You can follow the progress of the computation using the Spark application web server.

In [8]:
print raw_events.count()
61260109

We see that we have about 60 million events at our disposal.

The GDELT event data set collects geopolitical events that occur around the world. Each event is tagged with a Goldstein scale value that measures the potential for the event to destabilize the country. Let's compute and plot a histogram of the Goldstein scale values across all the events in the database. The Goldstein scale value is present in the 31st field.

First, let's make sure that plotting images are set to be displayed inline (see the IPython docs):

In [4]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np

Now we'll just confirm that all the Goldstein values are indeed between -10 and 10.

In [6]:
m = raw_events.map(lambda x: float(x[30])).min()
M = raw_events.map(lambda x: float(x[30])).max()
print "Min is %f\nMax is %f" % (m, M)
Min is -10.000000
Max is 10.000000

Here we compute the histogram. This is slightly hacky, but c'est la vie:

In [7]:
bins = np.arange(-10, 11)
one_hot = raw_events.map(lambda x: float(x[30])).map(lambda x: np.histogram([x], bins=bins)[0])
hist = one_hot.reduce(lambda x, y: x + y)
print hist
[ 4315780   562319   516529   938546   100780  3890452  1816766        0
  4678160  1290723  6450944 10235827  4893120  6673389  4912603  1487392
  1056329  5624014  1400922   415514]

Finally we plot the histogram

In [8]:
fig = plt.figure()
ax = fig.add_subplot(111)
t = ax.bar(bins[:-1], hist)

We can also plot the number of events each day for the 5 countries that have the most events in the year 2001.

First we can see the number of unique countries that are available. Note that we filter out events that don't list a country code.

In [6]:
raw_events.filter(lambda x: x[7] != '').map(lambda x: x[7]).distinct().collect()
Out[6]:
[u'',
 u'SAS',
 u'NMR',
 u'IDN',
 u'ALB',
 u'KHM',
 u'TUN',
 u'COK',
 u'SHN',
 u'THA',
 u'ESP',
 u'BLZ',
 u'IND',
 u'CAS',
 u'TON',
 u'COM',
 u'NAF',
 u'UKR',
 u'TGO',
 u'AZE',
 u'COL',
 u'MEX',
 u'NOR',
 u'BGR',
 u'LTU',
 u'AFR',
 u'SAU',
 u'CRI',
 u'TKM',
 u'EUR',
 u'DJI',
 u'NER',
 u'LAM',
 u'WLF',
 u'JOR',
 u'CAU',
 u'FJI',
 u'PRK',
 u'MMR',
 u'LAO',
 u'EAF',
 u'BLR',
 u'LBN',
 u'BEN',
 u'TTO',
 u'YEM',
 u'GEO',
 u'STP',
 u'PRT',
 u'HKG',
 u'LSO',
 u'KAZ',
 u'FRA',
 u'UGA',
 u'BEL',
 u'BTN',
 u'NAM',
 u'AIA',
 u'GBR',
 u'DMA',
 u'SVK',
 u'KIR',
 u'BWA',
 u'CAN',
 u'IRQ',
 u'GHA',
 u'BFA',
 u'CYM',
 u'DNK',
 u'AUS',
 u'VAT',
 u'BLK',
 u'PNG',
 u'MDA',
 u'MAR',
 u'MLI',
 u'ABW',
 u'DZA',
 u'MDG',
 u'HND',
 u'AUT',
 u'PGS',
 u'VUT',
 u'MYS',
 u'VNM',
 u'PER',
 u'SLV',
 u'JPN',
 u'BMU',
 u'ZAF',
 u'TMP',
 u'PRY',
 u'CAF',
 u'NIC',
 u'SUR',
 u'BRN',
 u'NGA',
 u'EGY',
 u'HTI',
 u'JAM',
 u'CZE',
 u'LKA',
 u'PLW',
 u'SOM',
 u'BOL',
 u'AND',
 u'IRL',
 u'KEN',
 u'ISR',
 u'ARE',
 u'CHE',
 u'NLD',
 u'SDN',
 u'IRN',
 u'MLT',
 u'HRV',
 u'ARG',
 u'SRB',
 u'ATG',
 u'GTM',
 u'ZWE',
 u'BHS',
 u'MAC',
 u'ETH',
 u'TJK',
 u'UZB',
 u'BHR',
 u'KNA',
 u'MDV',
 u'WSM',
 u'CHL',
 u'SWE',
 u'QAT',
 u'MOZ',
 u'WAF',
 u'SMR',
 u'ARM',
 u'SLE',
 u'RUS',
 u'BRB',
 u'USA',
 u'POL',
 u'CHN',
 u'GRD',
 u'SEA',
 u'ASA',
 u'BRA',
 u'GNQ',
 u'MNG',
 u'SYC',
 u'LVA',
 u'MEA',
 u'LUX',
 u'NPL',
 u'GRC',
 u'SEN',
 u'TCD',
 u'SAM',
 u'SLB',
 u'ISL',
 u'LBR',
 u'MKD',
 u'SWZ',
 u'DOM',
 u'FIN',
 u'SGP',
 u'KWT',
 u'CMR',
 u'WST',
 u'MWI',
 u'MUS',
 u'AFG',
 u'LBY',
 u'TZA',
 u'BDI',
 u'VCT',
 u'AGO',
 u'GIN',
 u'SAF',
 u'URY',
 u'CUB',
 u'DEU',
 u'MHL',
 u'NRU',
 u'CPV',
 u'TUR',
 u'CYP',
 u'LCA',
 u'EEU',
 u'ZMB',
 u'GNB',
 u'BGD',
 u'VEN',
 u'LIE',
 u'ROM',
 u'GAB',
 u'GMB',
 u'RWA',
 u'ECU',
 u'HUN',
 u'TUV',
 u'PAK',
 u'ITA',
 u'PHL',
 u'MRT',
 u'CRB',
 u'MCO',
 u'OMN',
 u'FSM',
 u'SYR',
 u'CIV',
 u'ERI',
 u'COD',
 u'GUY',
 u'COG',
 u'PAN',
 u'SCN',
 u'KGZ',
 u'PSE',
 u'EST',
 u'KOR',
 u'NZL']

Here we convert each event into counts. We will aggregate by country and day, for all events in the year 2001.

In [15]:
events_2001 = raw_events.filter(lambda x: x[7] != '').filter(lambda x: x[1] >= '20010101').filter(lambda x: x[1] <= '20011231')
events_2001.cache()
Out[15]:
PythonRDD[12] at RDD at PythonRDD.scala:40
In [19]:
events_2001.count()
Out[19]:
3162516
In [21]:
country_day_counts = events_2001.filter(lambda x: x[7] != '').map(lambda x: ((x[7], x[1]), 1)).reduceByKey(lambda x, y: x + y)
country_day_counts.cache()
Out[21]:
PythonRDD[22] at RDD at PythonRDD.scala:40
In [22]:
country_day_counts.take(5)
Out[22]:
[((u'VEN', u'20010602'), 12),
 ((u'GHA', u'20011224'), 15),
 ((u'BWA', u'20011224'), 2),
 ((u'PHL', u'20010223'), 63),
 ((u'IRN', u'20010428'), 186)]
In [23]:
from dateutil.parser import parse as parse_date
epoch = parse_date('19700101')
def td2s(td):
    return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 1000000) / 1e6
def day2unix(day):
    return td2s(parse_date(day) - epoch)
In [24]:
country_series = country_day_counts \
    .map(lambda x: (x[0][0], (day2unix(x[0][1]), x[1]))) \
    .groupByKey() \
    .map(lambda x: (x[0], sorted(x[1], key=lambda y: y[0]))) \
    .map(lambda x: (x[0], zip(*x[1]))) \
    .map(lambda x: (x[0], x[1][0], x[1][1]))
In [65]:
country_series.cache()
Out[65]:
PythonRDD[77] at RDD at PythonRDD.scala:40
In [25]:
country_series.first()
Out[25]:
(u'NMR',
 (978307200.0,
  978393600.0,
  978480000.0,
  978652800.0,
  978739200.0,
  978825600.0,
  978912000.0,
  978998400.0,
  979084800.0,
  979689600.0,
  980121600.0,
  980294400.0,
  980467200.0,
  980640000.0,
  980726400.0,
  980812800.0,
  980899200.0,
  981331200.0,
  981417600.0,
  981504000.0,
  981590400.0,
  981676800.0,
  981849600.0,
  982108800.0,
  982195200.0,
  982281600.0,
  982368000.0,
  982540800.0,
  982713600.0,
  982800000.0,
  982886400.0,
  982972800.0,
  983059200.0,
  983232000.0,
  983750400.0,
  983836800.0,
  983923200.0,
  984096000.0,
  984182400.0,
  984355200.0,
  984441600.0,
  984528000.0,
  984873600.0,
  985132800.0,
  985219200.0,
  985392000.0,
  985478400.0,
  985564800.0,
  985651200.0,
  985737600.0,
  985910400.0,
  986169600.0,
  986342400.0,
  986515200.0,
  986774400.0,
  986947200.0,
  987292800.0,
  987379200.0,
  987465600.0,
  987552000.0,
  987638400.0,
  987724800.0,
  987811200.0,
  987897600.0,
  987984000.0,
  988156800.0,
  988329600.0,
  988675200.0,
  988848000.0,
  989020800.0,
  989193600.0,
  989366400.0,
  989625600.0,
  989884800.0,
  989971200.0,
  990057600.0,
  990316800.0,
  990403200.0,
  990489600.0,
  990576000.0,
  990662400.0,
  990748800.0,
  990835200.0,
  991094400.0,
  991180800.0,
  991267200.0,
  991353600.0,
  991699200.0,
  991872000.0,
  991958400.0,
  992217600.0,
  992304000.0,
  992390400.0,
  992476800.0,
  992995200.0,
  993081600.0,
  993340800.0,
  993513600.0,
  993600000.0,
  993686400.0,
  994032000.0,
  994118400.0,
  994204800.0,
  994291200.0,
  994550400.0,
  994636800.0,
  994809600.0,
  994896000.0,
  995414400.0,
  995500800.0,
  995587200.0,
  995846400.0,
  996019200.0,
  996192000.0,
  996364800.0,
  996451200.0,
  996710400.0,
  996796800.0,
  997142400.0,
  997228800.0,
  997401600.0,
  997574400.0,
  997660800.0,
  997747200.0,
  997833600.0,
  998438400.0,
  998524800.0,
  998956800.0,
  999043200.0,
  999216000.0,
  999388800.0,
  999561600.0,
  999648000.0,
  999734400.0,
  999820800.0,
  1000080000.0,
  1000252800.0,
  1000425600.0,
  1000598400.0,
  1000684800.0,
  1000857600.0,
  1001030400.0,
  1001116800.0,
  1001289600.0,
  1001548800.0,
  1001635200.0,
  1001808000.0,
  1001980800.0,
  1002067200.0,
  1002153600.0,
  1002240000.0,
  1002412800.0,
  1002499200.0,
  1002585600.0,
  1002672000.0,
  1002758400.0,
  1002844800.0,
  1002931200.0,
  1003104000.0,
  1003190400.0,
  1003363200.0,
  1003622400.0,
  1003708800.0,
  1003795200.0,
  1004054400.0,
  1004140800.0,
  1004227200.0,
  1004313600.0,
  1004745600.0,
  1004918400.0,
  1005004800.0,
  1005091200.0,
  1005436800.0,
  1005523200.0,
  1005696000.0,
  1005782400.0,
  1005868800.0,
  1006128000.0,
  1006214400.0,
  1006300800.0,
  1006905600.0,
  1006992000.0,
  1007164800.0,
  1007251200.0,
  1007424000.0,
  1007769600.0,
  1008028800.0,
  1008115200.0,
  1008201600.0,
  1008460800.0,
  1008547200.0,
  1008633600.0,
  1008720000.0,
  1009238400.0,
  1009411200.0,
  1009584000.0,
  1009670400.0),
 (1,
  2,
  1,
  2,
  1,
  2,
  4,
  1,
  5,
  1,
  1,
  2,
  3,
  2,
  1,
  4,
  2,
  1,
  3,
  3,
  5,
  1,
  3,
  8,
  7,
  12,
  2,
  2,
  11,
  7,
  6,
  1,
  2,
  1,
  2,
  3,
  1,
  1,
  2,
  4,
  2,
  4,
  1,
  1,
  2,
  5,
  2,
  5,
  3,
  3,
  5,
  1,
  4,
  2,
  1,
  4,
  2,
  9,
  19,
  2,
  10,
  1,
  4,
  10,
  3,
  1,
  1,
  1,
  2,
  8,
  3,
  1,
  1,
  2,
  3,
  2,
  1,
  1,
  2,
  1,
  3,
  6,
  1,
  1,
  1,
  2,
  1,
  4,
  3,
  1,
  1,
  6,
  4,
  1,
  4,
  2,
  1,
  1,
  3,
  2,
  4,
  3,
  2,
  7,
  2,
  7,
  2,
  3,
  4,
  1,
  3,
  1,
  1,
  16,
  1,
  1,
  3,
  1,
  2,
  1,
  1,
  2,
  2,
  2,
  4,
  4,
  3,
  1,
  1,
  2,
  1,
  1,
  7,
  5,
  2,
  3,
  7,
  4,
  8,
  3,
  3,
  1,
  1,
  6,
  2,
  5,
  2,
  4,
  4,
  3,
  1,
  1,
  1,
  4,
  1,
  1,
  2,
  1,
  4,
  2,
  1,
  2,
  4,
  3,
  2,
  2,
  3,
  3,
  2,
  1,
  4,
  3,
  3,
  3,
  2,
  3,
  1,
  2,
  2,
  2,
  1,
  3,
  2,
  2,
  1,
  4,
  3,
  1,
  1,
  1,
  1,
  2,
  1,
  1,
  7,
  6,
  4))

Now we're ready to plot. Each object in country_series has the information we need to plot a single line.

In [27]:
random_color = lambda: '#%02x%02x%02x' % tuple(np.random.randint(0,256,3))
fig = plt.figure()
ax = fig.add_subplot(111)
for (country, times, events) in country_series.takeOrdered(10, lambda x: -sum(x[2])):
    t = ax.plot(times, events, lw=1, c=random_color())

What's the big spike for the blue line above?

In [28]:
country_day_counts.reduce(lambda x, y: max(x, y, key=lambda z: z[1]))
Out[28]:
((u'USA', u'20010912'), 2387)

Looks like it was the day after September 11th.