from IPython.display import HTML
import requests
MASTER_HOSTNAME = requests.get("http://169.254.169.254/latest/meta-data/public-hostname").text
%matplotlib inline
def wiki_table(data):
from types import StringTypes
html = ["
"]
for row in data:
if isinstance(row, StringTypes):
row = [row]
html.append('')
html.append('%s | ' % (row[0], row[0]))
for col in row[1:]:
html.append("%s | " % str(col))
html.append('
')
html.append('
')
return HTML(''.join(html))
print CLUSTER_URL
from pyspark import SparkContext
sc = SparkContext(CLUSTER_URL, 'ipython-notebook')
data = sc.parallelize(range(1000), 10)
data
data.count()
data.take(10)
help(data)
data.map(lambda x: str(x)).take(10)
data.reduce(lambda x, y: x + y)
wiki = sc.textFile("/wikitext")
berkeley_pages = wiki.filter(lambda x: "Berkeley" in x)
%time berkeley_pages.count()
HTML(('Master Web UI' % MASTER_HOSTNAME))
HTML(('Ganglia UI' % MASTER_HOSTNAME))
wiki.cache()
%time berkeley_pages.count()
%time berkeley_pages.count()
berkeley_pages.first()
for title in berkeley_pages.map(lambda x: x.split("\t")[0]).take(20):
print title
import re
def extract_links(raw_text):
for link in re.findall("\[\[([^\]]*)\]\]", raw_text.split("\t")[1]):
splits = link.split("|")
if not splits[0].startswith("Image:"):
yield splits[-1]
linked_pages = wiki.flatMap(extract_links)
wiki_table(linked_pages.take(20))
linked_pages.count()
number_of_inlinks = linked_pages.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y).cache()
%time inlink_frequencies = number_of_inlinks.map(lambda x: x[1]).countByValue()
import pylab
pylab.hist(inlink_frequencies.keys(), bins=range(50), weights=inlink_frequencies.values());
linked_only_once = number_of_inlinks.filter(lambda (title, count): count == 1).map(lambda x: x[0])
wiki_table(linked_only_once.take(20))
page_titles = wiki.map(lambda x: x.split("\t")[0]).cache()
number_of_inlinks_for_existing_pages = number_of_inlinks. \
leftOuterJoin(page_titles.map(lambda x: (x, True))). \
filter(lambda (title, (count, exists)): exists is not None). \
map(lambda (title, (count, exists)): (title, count))
%time inlink_frequencies_for_existing_pages = number_of_inlinks_for_existing_pages.map(lambda x: x[1]).countByValue()
pylab.hist(inlink_frequencies_for_existing_pages.keys(), bins=range(50), weights=inlink_frequencies_for_existing_pages.values());
num_pages = page_titles.count()
num_linked_pages = number_of_inlinks.count()
print "Existing pages: %i\nMissing pages: %i" % (num_pages, num_linked_pages - num_pages)
outlink_counts = wiki.map(lambda x: (x.split("\t")[0], len(list(extract_links(x)))))
%time outlink_count_frequencies = outlink_counts.map(lambda x: x[1]).countByValue()
pylab.hist(outlink_count_frequencies.keys(), bins=range(0, 400, 4), weights=outlink_count_frequencies.values());
prolific_linkers = outlink_counts.filter(lambda (title, count): count > 400).collectAsMap()
len(prolific_linkers)
import operator
top_linkers = sorted(prolific_linkers.iteritems(), key=operator.itemgetter(1), reverse=True)[:500]
wiki_table(top_linkers)