print sc raw_events = sc.textFile('/user/laserson/gdelt/events').map(lambda x: x.split('\t')) # raw_events.cache() print raw_events.first() print raw_events.count() %matplotlib inline import matplotlib.pyplot as plt import numpy as np m = raw_events.map(lambda x: float(x[30])).min() M = raw_events.map(lambda x: float(x[30])).max() print "Min is %f\nMax is %f" % (m, M) bins = np.arange(-10, 11) one_hot = raw_events.map(lambda x: float(x[30])).map(lambda x: np.histogram([x], bins=bins)[0]) hist = one_hot.reduce(lambda x, y: x + y) print hist fig = plt.figure() ax = fig.add_subplot(111) t = ax.bar(bins[:-1], hist) raw_events.filter(lambda x: x[7] != '').map(lambda x: x[7]).distinct().collect() events_2001 = raw_events.filter(lambda x: x[7] != '').filter(lambda x: x[1] >= '20010101').filter(lambda x: x[1] <= '20011231') events_2001.cache() events_2001.count() country_day_counts = events_2001.filter(lambda x: x[7] != '').map(lambda x: ((x[7], x[1]), 1)).reduceByKey(lambda x, y: x + y) country_day_counts.cache() country_day_counts.take(5) from dateutil.parser import parse as parse_date epoch = parse_date('19700101') def td2s(td): return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 1000000) / 1e6 def day2unix(day): return td2s(parse_date(day) - epoch) country_series = country_day_counts \ .map(lambda x: (x[0][0], (day2unix(x[0][1]), x[1]))) \ .groupByKey() \ .map(lambda x: (x[0], sorted(x[1], key=lambda y: y[0]))) \ .map(lambda x: (x[0], zip(*x[1]))) \ .map(lambda x: (x[0], x[1][0], x[1][1])) country_series.cache() country_series.first() random_color = lambda: '#%02x%02x%02x' % tuple(np.random.randint(0,256,3)) fig = plt.figure() ax = fig.add_subplot(111) for (country, times, events) in country_series.takeOrdered(10, lambda x: -sum(x[2])): t = ax.plot(times, events, lw=1, c=random_color()) country_day_counts.reduce(lambda x, y: max(x, y, key=lambda z: z[1]))