import pandas as pd
from __future__ import division
from moztelemetry import get_pings, get_pings_properties
from collections import defaultdict
%pylab inline
Populating the interactive namespace from numpy and matplotlib
sc.defaultParallelism
80
build_id = ("20150708000000", "20150708999999")
main_pings = get_pings(sc, app="Firefox", channel="nightly", build_id=build_id, schema="v4", doc_type="main")
sses_pings = get_pings(sc, app="Firefox", channel="nightly", build_id=build_id, schema="v4", doc_type="saved_session")
meta = ["meta/documentId",
"meta/clientId",
"payload/info/sessionId",
"payload/info/subsessionCounter",
"payload/info/reason"]
metrics = ["payload/simpleMeasurements/firstPaint",
"payload/histograms/GC_MS",
"payload/histograms/UPDATE_PREF_SERVICE_ERRORS_NOTIFY"]
main_pings_ss = get_pings_properties(main_pings, meta + metrics, with_processes=True)
sses_pings_ss = get_pings_properties(sses_pings, meta + metrics, with_processes=True)
def dedupe(rdd):
return rdd.filter(lambda p: p["meta/clientId"] is not None)\
.map(lambda p: (p["meta/clientId"] + p["meta/documentId"], p))\
.reduceByKey(lambda x, y: x)\
.map(lambda x: x[1])
main_pings_dd = dedupe(main_pings_ss)
sses_pings_dd = dedupe(sses_pings_ss)
def aggregate_pings(state, ping):
for metric, value in ping.iteritems():
state[metric] += [value]
return state
def aggregate_aggregates(agg1, agg2):
for metric, values in agg2.iteritems():
if metric not in agg1:
agg1[metric] = values
continue
else:
agg1[metric] += values
return agg1
def keyify(ping):
sessionId = ping.pop("payload/info/sessionId", None)
clientId = ping.pop("meta/clientId", None)
return (clientId + sessionId, ping)
main_grouped = main_pings_dd.map(keyify).aggregateByKey(defaultdict(list), aggregate_pings, aggregate_aggregates)
sses_grouped = sses_pings_dd.map(keyify)
joined = main_grouped.rightOuterJoin(sses_grouped) # join on completed sessions only
# See https://bugzilla.mozilla.org/show_bug.cgi?id=1147395#c8
metrics_parent = ["payload/simpleMeasurements/firstPaint",
"payload/histograms/GC_MS_parent",
"payload/histograms/UPDATE_PREF_SERVICE_ERRORS_NOTIFY_parent"]
def compare(joined):
main, sses = joined[1]
if (sses is None) ^ (main is None):
return True
for metric in metrics_parent:
sses_metric = sses[metric]
main_metric = main[metric]
if isinstance(sses_metric, pd.Series): # Histogram
main_metric = filter(lambda x: x is not None, main_metric)
if main_metric and not np.all(sses_metric == reduce(lambda x, y: x + y, main_metric)):
return True
else:
if sses_metric != main_metric[0]:
return True
return False
print "Overall {:.2f}% of sessions mismatch".format(100*joined.filter(compare).count()/joined.count())
Overall 0.09% of sessions mismatch
def multi_frag_filter(j):
k, v = j
main, sses = v
if main and len(main[main.keys()[0]]) > 1:
return True
multi_frag_joined = joined.filter(multi_frag_filter)
print "Overall {:.2f}% of multi-fragment sessions mismatch".format(100*multi_frag_joined.filter(compare).count()/multi_frag_joined.count())
Overall 0.33% of multi-fragment sessions mismatch
def complete_multi_frag_filter(j):
k, v = j
main, sses = v
if not main:
return False
num_fragments = len(main[main.keys()[0]])
if num_fragments > 1:
reason = main["payload/info/reason"]
counter = main["payload/info/subsessionCounter"]
# Find the total number of fragments for this session
expected_num_fragments = 0
for i, r in enumerate(reason):
if r == "shutdown":
expected_num_fragments = counter[i]
# Do we have all fragments?
if expected_num_fragments != num_fragments:
return False
# Do we have the right fragments?
if set(range(1, num_fragments + 1)) != set(counter):
return False
return True
complete_multi_frag_joined = joined.filter(complete_multi_frag_filter)
print "Overall {:.2f}% of complete multi-fragment sessions mismatch".format(100*complete_multi_frag_joined.filter(compare).count()/complete_multi_frag_joined.count())
Overall 0.06% of complete multi-fragment sessions mismatch