import ujson as json
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import plotly.plotly as py
import networkx as nx
import collections
from moztelemetry import get_pings, get_pings_properties, get_one_ping_per_client
%pylab inline
Populating the interactive namespace from numpy and matplotlib
sc.defaultParallelism
48
pings = get_pings(sc, app="Firefox",
channel="nightly",
submission_date=("20150520","20150525"),
fraction=1,
schema="v4")
p = pings.first()
# p["payload"]["info"]
p.keys(), p.get("payload",{}).get("info",{}).get("subsessionId",False)
# {k:p[k] for k in p.keys() if k!="meta"}
([u'clientId', u'id', u'environment', u'application', u'version', 'meta', u'creationDate', u'type', u'payload'], u'e52f169e-35fb-4978-a470-ea3f38ce4fa7')
To distinguish from the other ids running aroung, we'll call the top level 'id' field the "pingId"
Note that we have to filter out the 'meta' entry, b/c this can contain things like intake timestamps, which should be expected to change if the same ping is sent twice
pingsByPingId = pings \
.map(lambda p: (p.get("id","MISSING"),
[{k:p[k] for k in p.keys() if k!="meta"}]) ) \
.reduceByKey(lambda l1,l2: l1+l2)
pingsByPingId.cache()
PythonRDD[8] at RDD at PythonRDD.scala:43
pById = pingsByPingId.take(10)
Let's call the number of pings that share a pingId the "multiplicity" of that pingId. The tuples below are of the form:
(multiplicity of a pingId, number of pingIds with that multiplicity)
pingIdMultiplicities = pingsByPingId \
.map(lambda id_pList: (len(id_pList[1]), 1) ) \
.reduceByKey(lambda x1,x2: x1+x2)
pingIdMultiplicities.cache()
pingIdMultiplicities.count()
--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) <ipython-input-7-b69abf74b688> in <module>() 2 3 pingIdMultiplicities.cache() ----> 4 pingIdMultiplicities.count() /home/hadoop/spark/python/pyspark/rdd.pyc in count(self) 827 3 828 """ --> 829 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 830 831 def stats(self): /home/hadoop/spark/python/pyspark/rdd.pyc in sum(self) 818 6.0 819 """ --> 820 return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) 821 822 def count(self): /home/hadoop/spark/python/pyspark/rdd.pyc in reduce(self, f) 723 yield reduce(f, iterator, initial) 724 --> 725 vals = self.mapPartitions(func).collect() 726 if vals: 727 return reduce(f, vals) /home/hadoop/spark/python/pyspark/rdd.pyc in collect(self) 684 """ 685 with SCCallSiteSync(self.context) as css: --> 686 bytesInJava = self._jrdd.collect().iterator() 687 return list(self._collect_iterator_through_file(bytesInJava)) 688 /home/hadoop/anaconda/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, --> 538 self.target_id, self.name) 539 540 for temp_arg in temp_args: /home/hadoop/anaconda/lib/python2.7/site-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. --> 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling o105.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 4942 in stage 1.0 failed 4 times, most recent failure: Lost task 4942.3 in stage 1.0 (TID 5842, ip-10-37-6-174.us-west-2.compute.internal): ExecutorLostFailure (executor 5 lost) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
pingIdMultiplicities.collect()
Next let's set aside all the sets of pings that have any non-identical members
def allEqual(l):
if len(l)<=1:
return True
else:
e1 = l[0]
for e in l[1:]:
if e != e1:
return False
return True
pingsByPingId_nonexactDupes = pingsByPingId \
.filter(lambda id_pList: len(id_pList[1])>1 ) \
.filter(lambda id_pList: not allEqual(id_pList[1]) )
pingsByPingId_nonexactDupes.cache()
pingsByPingId_nonexactDupes.count()
329
# since there are only 329, pull them local for more work
nonexactDupesSample = pingsByPingId_nonexactDupes.collect()
collections.Counter(map(lambda tup: len(tup[1]), nonexactDupesSample))
Counter({2: 320, 3: 9})
So, from the above we see that there are 320 pingIds that show up in sets of two pings that are not all identical, and 9 pingIds that show up in sets of 3 pings that are not all identical. Note that in the case of the 3 ping sets, this does not mean that all 3 are necessarily different, just that at least one is not the same as the others.
So we know that not all ping sumbissions are atomic. What can change within a ping between submissions? In the 9 cases of 3 pings under the same pingId, we'll simplify by just looking at the first two.
# cribbed from http://stackoverflow.com/questions/5903720/recursive-diff-of-two-python-dictionaries-keys-and-values
def list_to_dict(l):
return dict(zip(map(str, range(len(l))), l))
def dictDiff2(d1, d2, path=""):
changes = []
for k in d1:
if k not in d2:
changes.append( ("path not present in both", path+"/"+k) )
for k in d2:
if k not in d1:
changes.append( ("path not present in both", path+"/"+k) )
continue
if d2[k] != d1[k]:
if type(d2[k]) not in (dict, list):
changes.append( ("value changed", path+"/"+k) )
else:
if type(d1[k]) != type(d2[k]):
changes.append( ("value changed", path+"/"+k) )
continue
else:
if type(d2[k]) == dict:
changes += dictDiff2(d1[k], d2[k], path+"/"+k)
continue
elif type(d2[k]) == list:
changes += dictDiff2(list_to_dict(d1[k]), list_to_dict(d2[k]), path+"/"+k)
return changes
pingChanges = map(lambda tup: dictDiff2(tup[1][0],tup[1][1]), nonexactDupesSample)
# how many difference are there per between pings for each pingId?
collections.Counter([len(pc) for pc in pingChanges])
Counter({1: 119, 2: 59, 3: 32, 8: 26, 5: 19, 4: 18, 6: 14, 11: 11, 9: 8, 14: 7, 10: 5, 12: 5, 7: 4, 0: 1, 18: 1})
from the above we see that in most instances, these mismatched pings differ in only one place, but they can sometimes differ in several places (up to 18 places in the data considered). However, we don't see massive changes across e.g. hundreds or thousands of paths. So the next sub-question is:
pingChangesFlat = reduce(lambda l1,l2:l1+l2, pingChanges, [])
changes = {}
for changeType,changePath in pingChangesFlat:
if changeType not in changes:
changes[changeType] = {changePath:1}
else:
changes[changeType][changePath] = changes[changeType].get(changePath,0)+1
The following shows json paths that are only present in one of the two pings as
(number of times the path was missing, path)
dict(sorted([(tup[1],tup[0]) for tup in changes["path not present in both"].items()],reverse=True))
{1: u'/payload/addonDetails/XPI/1tinUyqW@cx.com/shutdown_MS', 2: u'/payload/addonDetails/XPI/CanvasBlocker@kkapsner.de/shutdown_MS', 3: u'/payload/addonDetails/XPI/browsec@browsec.com/shutdown_MS', 4: u'/payload/addonDetails/XPI/firebug@software.joehewitt.com/shutdown_MS', 5: u'/payload/addonDetails/XPI/compatibility@addons.mozilla.org/shutdown_MS', 6: u'/payload/addonDetails/XPI/aboutsessionstore@dt/shutdown_MS', 7: u'/payload/addonDetails/XPI/arpit2@techraga.in/shutdown_MS', 8: u'/payload/addonDetails/XPI/CSTBB@NArisT2_Noia4dev/shutdown_MS', 9: u'/payload/addonDetails/XPI/en-US@dictionaries.addons.mozilla.org/shutdown_MS', 10: u'/payload/addonDetails/XPI/firefox@mega.co.nz/shutdown_MS', 11: u'/payload/addonDetails/XPI/the-addon-bar@GeekInTraining-GiT/shutdown_MS', 12: u'/payload/addonDetails/XPI/adbhelper@mozilla.org/shutdown_MS', 13: u'/payload/addonDetails/XPI/{b9db16a4-6edc-47ec-a1f4-b86292ed211d}/shutdown_MS', 14: u'/payload/addonDetails/XPI/jid1-xUfzOsOFlzSOXg@jetpack/shutdown_MS', 16: u'/payload/addonDetails/XPI/uriloader@pdf.js/shutdown_MS', 18: u'/payload/addonDetails/XPI/jid1-cwbvBTE216jjpg@jetpack/shutdown_MS', 20: u'/payload/addonDetails/XPI/{2b10c1c8-a11f-4bad-fe9c-1c11e82cac42}/shutdown_MS', 22: u'/payload/addonDetails/XPI/skip_compatibility_check@sdrocking.com/shutdown_MS', 24: u'/payload/addonDetails/XPI/firefox@ghostery.com/shutdown_MS', 25: u'/payload/addonDetails/XPI/check-compatibility@dactyl.googlecode.com/shutdown_MS', 30: u'/payload/addonDetails/XPI/elemhidehelper@adblockplus.org/shutdown_MS', 58: u'/payload/addonDetails/XPI/mediahint@jetpack/shutdown_MS', 81: u'/payload/addonDetails/XPI/{d10d0bf8-f5b5-c8b4-a8b2-2b9879e08c5d}/shutdown_MS'}
The following shows json paths that had different values between the two pings as
(number of times the path differed, path)
dict(sorted([(tup[1],tup[0]) for tup in changes['value changed'].items()],reverse=True))
{1: u'/payload/addonDetails/XPI/dta@downthemall.net/shutdown_MS', 2: u'/payload/simpleMeasurements/UITelemetry/contextmenu/__DEFAULT__/["link","image"]/withoutcustom/openlinkintab', 3: u'/payload/addonDetails/XPI/firebug@software.joehewitt.com/creator', 4: u'/payload/addonDetails/XPI/firebug@software.joehewitt.com/startup_MS', 5: u'/payload/simpleMeasurements/UITelemetry/contextmenu/__DEFAULT__/["link"]/withoutcustom/openlinkintab', 6: u'/payload/simpleMeasurements/UITelemetry/toolbars/countableEvents/__DEFAULT__/click-builtin-item/back-button/left', 11: u'/payload/addonDetails/XPI/firebug@software.joehewitt.com/shutdown_MS', 47: u'/payload/info/reason', 64: u'/payload/simpleMeasurements/UITelemetry/toolbars/countableEvents/__DEFAULT__/click-builtin-item/tabbrowser-tabs/left'}
# pingsByPingId = pings \
# .map(lambda p: (p.get("id","MISSING"),
# [{k:p[k] for k in p.keys() if k!="meta"}]) ) \
# .reduceByKey(lambda l1,l2: l1+l2)
pingsByPingId = pings \
.map(lambda p: (p.get("payload",{}).get("info",{}).get("subsessionId","missing"),
[{k:p[k] for k in p.keys() if k!="meta"}]) ) \
.reduceByKey(lambda l1,l2: l1+l2)
pingsByPingId.cache()
--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) <ipython-input-19-4e75f86cf1fa> in <module>() 7 pingsByPingId = pings .map(lambda p: (p.get("payload",{}).get("info",{}).get("subsessionId","missing"), 8 [{k:p[k] for k in p.keys() if k!="meta"}]) ) \ ----> 9 .reduceByKey(lambda l1,l2: l1+l2) 10 11 pingsByPingId.cache() /home/hadoop/spark/python/pyspark/rdd.py in reduceByKey(self, func, numPartitions) 1347 [('a', 2), ('b', 1)] 1348 """ -> 1349 return self.combineByKey(lambda x: x, func, func, numPartitions) 1350 1351 def reduceByKeyLocally(self, func): /home/hadoop/spark/python/pyspark/rdd.py in combineByKey(self, createCombiner, mergeValue, mergeCombiners, numPartitions) 1573 1574 locally_combined = self.mapPartitions(combineLocally) -> 1575 shuffled = locally_combined.partitionBy(numPartitions) 1576 1577 def _mergeCombiners(iterator): /home/hadoop/spark/python/pyspark/rdd.py in partitionBy(self, numPartitions, partitionFunc) 1522 partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, 1523 id(partitionFunc)) -> 1524 jrdd = pairRDD.partitionBy(partitioner).values() 1525 rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer)) 1526 # This is required so that id(partitionFunc) remains unique, /home/hadoop/anaconda/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, --> 538 self.target_id, self.name) 539 540 for temp_arg in temp_args: /home/hadoop/anaconda/lib/python2.7/site-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. --> 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling o158.values. : org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1478) at org.apache.spark.rdd.RDD.map(RDD.scala:288) at org.apache.spark.api.java.JavaPairRDD.values(JavaPairRDD.scala:905) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 15 more
p = pingsByPingId.first()
--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) <ipython-input-18-0ce931ded742> in <module>() ----> 1 p = pingsByPingId.first() /home/hadoop/spark/python/pyspark/rdd.py in first(self) 1137 ValueError: RDD is empty 1138 """ -> 1139 rs = self.take(1) 1140 if rs: 1141 return rs[0] /home/hadoop/spark/python/pyspark/rdd.py in take(self, num) 1119 1120 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) -> 1121 res = self.context.runJob(self, takeUpToNumLeft, p, True) 1122 1123 items += res /home/hadoop/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal) 825 # SparkContext#runJob. 826 mappedRDD = rdd.mapPartitions(partitionFunc) --> 827 it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) 828 return list(mappedRDD._collect_iterator_through_file(it)) 829 /home/hadoop/anaconda/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, --> 538 self.target_id, self.name) 539 540 for temp_arg in temp_args: /home/hadoop/anaconda/lib/python2.7/site-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. --> 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : java.lang.IllegalStateException: SparkContext has been shutdown at org.apache.spark.SparkContext.runJob(SparkContext.scala:1316) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1339) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1353) at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:344) at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745)
p.keys()
[u'clientId', u'id', u'environment', u'application', u'version', 'meta', u'creationDate', u'type', u'payload']
len(p)
9
pingIdMultiplicities = pingsByPingId \
.map(lambda id_pList: (len(id_pList[1]), 1) ) \
.reduceByKey(lambda x1,x2: x1+x2)
--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) <ipython-input-15-2328246389da> in <module>() ----> 1 pingIdMultiplicities = pingsByPingId .map(lambda id_pList: (len(id_pList[1]), 1) ) .reduceByKey(lambda x1,x2: x1+x2) /home/hadoop/spark/python/pyspark/rdd.py in reduceByKey(self, func, numPartitions) 1347 [('a', 2), ('b', 1)] 1348 """ -> 1349 return self.combineByKey(lambda x: x, func, func, numPartitions) 1350 1351 def reduceByKeyLocally(self, func): /home/hadoop/spark/python/pyspark/rdd.py in combineByKey(self, createCombiner, mergeValue, mergeCombiners, numPartitions) 1573 1574 locally_combined = self.mapPartitions(combineLocally) -> 1575 shuffled = locally_combined.partitionBy(numPartitions) 1576 1577 def _mergeCombiners(iterator): /home/hadoop/spark/python/pyspark/rdd.py in partitionBy(self, numPartitions, partitionFunc) 1522 partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, 1523 id(partitionFunc)) -> 1524 jrdd = pairRDD.partitionBy(partitioner).values() 1525 rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer)) 1526 # This is required so that id(partitionFunc) remains unique, /home/hadoop/anaconda/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, --> 538 self.target_id, self.name) 539 540 for temp_arg in temp_args: /home/hadoop/anaconda/lib/python2.7/site-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. --> 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling o129.values. : org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1478) at org.apache.spark.rdd.RDD.map(RDD.scala:288) at org.apache.spark.api.java.JavaPairRDD.values(JavaPairRDD.scala:905) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 15 more
pingIdMultiplicities.collect()
--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) <ipython-input-112-96753fd2e20e> in <module>() ----> 1 pingIdMultiplicities.collect() /home/hadoop/spark/python/pyspark/rdd.py in collect(self) 684 """ 685 with SCCallSiteSync(self.context) as css: --> 686 bytesInJava = self._jrdd.collect().iterator() 687 return list(self._collect_iterator_through_file(bytesInJava)) 688 /home/hadoop/anaconda/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, --> 538 self.target_id, self.name) 539 540 for temp_arg in temp_args: /home/hadoop/anaconda/lib/python2.7/site-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. --> 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling o384.collect. : org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:701) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:701) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1428) at akka.actor.Actor$class.aroundPostStop(Actor.scala:475) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundPostStop(DAGScheduler.scala:1375) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) at akka.actor.ActorCell.terminate(ActorCell.scala:369) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
pingsByPingId_nonexactDupes = pingsByPingId \
.filter(lambda id_pList: len(id_pList[1])>1 ) \
.filter(lambda id_pList: not allEqual(id_pList[1]) )
pingsByPingId_nonexactDupes.count()
nonexactDupesSample = pingsByPingId_nonexactDupes.collect()
collections.Counter(map(lambda tup: len(tup[1]), nonexactDupesSample))
pingChanges = map(lambda tup: dictDiff2(tup[1][0],tup[1][1]), nonexactDupesSample)
# how many difference are there per between pings for each pingId?
collections.Counter([len(pc) for pc in pingChanges])
pingChangesFlat = reduce(lambda l1,l2:l1+l2, pingChanges, [])
changes = {}
for changeType,changePath in pingChangesFlat:
if changeType not in changes:
changes[changeType] = {changePath:1}
else:
changes[changeType][changePath] = changes[changeType].get(changePath,0)+1
dict(sorted([(tup[1],tup[0]) for tup in changes["path not present in both"].items()],reverse=True)
dict(sorted([(tup[1],tup[0]) for tup in changes['value changed'].items()],reverse=True))