from IPython.display import HTML import requests MASTER_HOSTNAME = requests.get("http://169.254.169.254/latest/meta-data/public-hostname").text %matplotlib inline def wiki_table(data): from types import StringTypes html = [""] for row in data: if isinstance(row, StringTypes): row = [row] html.append('') html.append('' % (row[0], row[0])) for col in row[1:]: html.append("" % str(col)) html.append('') html.append('
%s%s
') return HTML(''.join(html)) print CLUSTER_URL from pyspark import SparkContext sc = SparkContext(CLUSTER_URL, 'ipython-notebook') data = sc.parallelize(range(1000), 10) data data.count() data.take(10) help(data) data.map(lambda x: str(x)).take(10) data.reduce(lambda x, y: x + y) wiki = sc.textFile("/wikitext") berkeley_pages = wiki.filter(lambda x: "Berkeley" in x) %time berkeley_pages.count() HTML(('Master Web UI' % MASTER_HOSTNAME)) HTML(('Ganglia UI' % MASTER_HOSTNAME)) wiki.cache() %time berkeley_pages.count() %time berkeley_pages.count() berkeley_pages.first() for title in berkeley_pages.map(lambda x: x.split("\t")[0]).take(20): print title import re def extract_links(raw_text): for link in re.findall("\[\[([^\]]*)\]\]", raw_text.split("\t")[1]): splits = link.split("|") if not splits[0].startswith("Image:"): yield splits[-1] linked_pages = wiki.flatMap(extract_links) wiki_table(linked_pages.take(20)) linked_pages.count() number_of_inlinks = linked_pages.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y).cache() %time inlink_frequencies = number_of_inlinks.map(lambda x: x[1]).countByValue() import pylab pylab.hist(inlink_frequencies.keys(), bins=range(50), weights=inlink_frequencies.values()); linked_only_once = number_of_inlinks.filter(lambda (title, count): count == 1).map(lambda x: x[0]) wiki_table(linked_only_once.take(20)) page_titles = wiki.map(lambda x: x.split("\t")[0]).cache() number_of_inlinks_for_existing_pages = number_of_inlinks. \ leftOuterJoin(page_titles.map(lambda x: (x, True))). \ filter(lambda (title, (count, exists)): exists is not None). \ map(lambda (title, (count, exists)): (title, count)) %time inlink_frequencies_for_existing_pages = number_of_inlinks_for_existing_pages.map(lambda x: x[1]).countByValue() pylab.hist(inlink_frequencies_for_existing_pages.keys(), bins=range(50), weights=inlink_frequencies_for_existing_pages.values()); num_pages = page_titles.count() num_linked_pages = number_of_inlinks.count() print "Existing pages: %i\nMissing pages: %i" % (num_pages, num_linked_pages - num_pages) outlink_counts = wiki.map(lambda x: (x.split("\t")[0], len(list(extract_links(x))))) %time outlink_count_frequencies = outlink_counts.map(lambda x: x[1]).countByValue() pylab.hist(outlink_count_frequencies.keys(), bins=range(0, 400, 4), weights=outlink_count_frequencies.values()); prolific_linkers = outlink_counts.filter(lambda (title, count): count > 400).collectAsMap() len(prolific_linkers) import operator top_linkers = sorted(prolific_linkers.iteritems(), key=operator.itemgetter(1), reverse=True)[:500] wiki_table(top_linkers)