from pyspark import SparkContext sc = SparkContext( CLUSTER_URL, 'pyspark') sc pagecounts = sc.textFile("/wiki/pagecounts") pagecounts pagecounts.take(10) %time pagecounts.count() from IPython.display import HTML, IFrame, display my_url = CLUSTER_URL.split(':')[1] spark_console = 'http:'+my_url+':8080' display(HTML('You can see the Spark Console here:' % spark_console)) IFrame(spark_console, 800, 300) enPages = pagecounts.filter(lambda x: x.split(" ")[1] == "en").cache() %time enPages.count() %time enPages.count() enTuples = enPages.map(lambda x: x.split(" ")) enKeyValuePairs = enTuples.map(lambda x: (x[0][:8], int(x[3]))) enKeyValuePairs.reduceByKey(lambda x, y: x + y, 1).collect() enPages.map( lambda x: x.split(" ") ).map( lambda x: (x[0][:8], int(x[3])) ).reduceByKey( lambda x, y: x + y, 1 ).collect() %%time fields = enPages.map(lambda x: x.split(" ")) title_nhits = fields.map(lambda x: (x[2], int(x[3]))) nhits = title_nhits.reduceByKey(lambda x, y: x + y, 40) high_traffic = nhits.filter(lambda x: x[1] > 200000).map(lambda x: (x[1], x[0])).collect() high_traffic