#!/usr/bin/env python # coding: utf-8 # ### [Bug 1147395]([https://bugzilla.mozilla.org/show_bug.cgi?id=1147395) saved_session vs main pings validation # In[1]: import pandas as pd from __future__ import division from moztelemetry import get_pings, get_pings_properties from collections import defaultdict get_ipython().run_line_magic('pylab', 'inline') # In[2]: sc.defaultParallelism # #### Get main and saved_session pings for recent build-ids: # In[3]: build_id = ("20150708000000", "20150708999999") main_pings = get_pings(sc, app="Firefox", channel="nightly", build_id=build_id, schema="v4", doc_type="main") sses_pings = get_pings(sc, app="Firefox", channel="nightly", build_id=build_id, schema="v4", doc_type="saved_session") # ####Extract the set of metrics we want to compare: # In[4]: meta = ["meta/documentId", "meta/clientId", "payload/info/sessionId", "payload/info/subsessionCounter", "payload/info/reason"] metrics = ["payload/simpleMeasurements/firstPaint", "payload/histograms/GC_MS", "payload/histograms/UPDATE_PREF_SERVICE_ERRORS_NOTIFY"] main_pings_ss = get_pings_properties(main_pings, meta + metrics, with_processes=True) sses_pings_ss = get_pings_properties(sses_pings, meta + metrics, with_processes=True) # ####Remove dupes: # In[5]: def dedupe(rdd): return rdd.filter(lambda p: p["meta/clientId"] is not None)\ .map(lambda p: (p["meta/clientId"] + p["meta/documentId"], p))\ .reduceByKey(lambda x, y: x)\ .map(lambda x: x[1]) # In[6]: main_pings_dd = dedupe(main_pings_ss) sses_pings_dd = dedupe(sses_pings_ss) # ####Group pings by session: # In[7]: def aggregate_pings(state, ping): for metric, value in ping.iteritems(): state[metric] += [value] return state def aggregate_aggregates(agg1, agg2): for metric, values in agg2.iteritems(): if metric not in agg1: agg1[metric] = values continue else: agg1[metric] += values return agg1 def keyify(ping): sessionId = ping.pop("payload/info/sessionId", None) clientId = ping.pop("meta/clientId", None) return (clientId + sessionId, ping) main_grouped = main_pings_dd.map(keyify).aggregateByKey(defaultdict(list), aggregate_pings, aggregate_aggregates) sses_grouped = sses_pings_dd.map(keyify) # ####Join main and saved_session metrics: # In[8]: joined = main_grouped.rightOuterJoin(sses_grouped) # join on completed sessions only # ####Compare sessions: # In[9]: # See https://bugzilla.mozilla.org/show_bug.cgi?id=1147395#c8 metrics_parent = ["payload/simpleMeasurements/firstPaint", "payload/histograms/GC_MS_parent", "payload/histograms/UPDATE_PREF_SERVICE_ERRORS_NOTIFY_parent"] def compare(joined): main, sses = joined[1] if (sses is None) ^ (main is None): return True for metric in metrics_parent: sses_metric = sses[metric] main_metric = main[metric] if isinstance(sses_metric, pd.Series): # Histogram main_metric = filter(lambda x: x is not None, main_metric) if main_metric and not np.all(sses_metric == reduce(lambda x, y: x + y, main_metric)): return True else: if sses_metric != main_metric[0]: return True return False # In[10]: print "Overall {:.2f}% of sessions mismatch".format(100*joined.filter(compare).count()/joined.count()) # ####Compare only multi-fragment sessions: # In[11]: def multi_frag_filter(j): k, v = j main, sses = v if main and len(main[main.keys()[0]]) > 1: return True multi_frag_joined = joined.filter(multi_frag_filter) # In[12]: print "Overall {:.2f}% of multi-fragment sessions mismatch".format(100*multi_frag_joined.filter(compare).count()/multi_frag_joined.count()) # ####Compare only multi-fragment sessions for which all fragments have been received: # In[13]: def complete_multi_frag_filter(j): k, v = j main, sses = v if not main: return False num_fragments = len(main[main.keys()[0]]) if num_fragments > 1: reason = main["payload/info/reason"] counter = main["payload/info/subsessionCounter"] # Find the total number of fragments for this session expected_num_fragments = 0 for i, r in enumerate(reason): if r == "shutdown": expected_num_fragments = counter[i] # Do we have all fragments? if expected_num_fragments != num_fragments: return False # Do we have the right fragments? if set(range(1, num_fragments + 1)) != set(counter): return False return True complete_multi_frag_joined = joined.filter(complete_multi_frag_filter) # In[14]: print "Overall {:.2f}% of complete multi-fragment sessions mismatch".format(100*complete_multi_frag_joined.filter(compare).count()/complete_multi_frag_joined.count())