#!/usr/bin/env python # coding: utf-8 # In[1]: import ujson as json import matplotlib.pyplot as plt import pandas as pd import numpy as np import plotly.plotly as py import networkx as nx import collections from moztelemetry import get_pings, get_pings_properties, get_one_ping_per_client get_ipython().run_line_magic('pylab', 'inline') # In[3]: # parallelism=48 on a 3 node cluster sc.defaultParallelism # #Get pings, filter them, and do some basic checks # In[4]: pings = get_pings(sc, app="Firefox", channel="nightly", submission_date=("20150507","20150514"), fraction=1, schema="v4") # In[5]: p = pings.first() # In[6]: # p["payload"]["info"] p.keys() # p.get("payload",{}).get("info",{}).get("subsessionId",False) # {k:p[k] for k in p.keys() if k!="main"} # In[7]: info = get_pings_properties(pings, ["clientId", "type", "payload/info", "environment/build/buildId"]) subsess = info.filter( lambda p: ((p["payload/info"].get("reason", "idle-daily") != "idle-daily") and (p["type"] == "main") and (p["environment/build/buildId"]>"20150507000000")) ) # In[8]: subsess.cache() numPings = subsess.count() # In[ ]: numPings # # Group pings by clientId # Get the "payload/info" section recent pings from builds newer than 20150507000000, and which are not idle-daily pings, and group by clientId # In[7]: clients = subsess.map(lambda p: (p.get("clientId","noId"), [p["payload/info"]]) ) \ .reduceByKey(lambda l1,l2: l1+l2) clients.cache() # In[8]: numClients = clients.count() numClients # ##Construct session graphs and compute degree distributions over graphs # # Once we've grouped by clientId, we can use the subSessionIds and previousSubsessionId pointer to construct a graph of the session, and then look at the degree of each node in the graph to see whether the graph looks the way we expect. Ideally, we should have a graph that has *zero* nodes with degree greater than 1 (no subsession should be pointed at by two or more sessions) and only *one* node with degree zero (the first subsession in the chain won't point to any prior subsession). Additionally, any given value of 'profileSubsessionCounter' should appear only once. # # In[9]: def sessGraphFromList(sessInfoList): g = nx.DiGraph() for s in sessInfoList: # each s is a separate subsession submission "info" section # have to clear these vars to prevent spillover from previous iter subsessId = None prevSubSess = None try: subsessId = s['subsessionId'] except KeyError: continue N = s.get("profileSubsessionCounter","NA") g.add_node(subsessId,subSessNum=N) prevSubSess = s.get('previousSubsessionId',None) if prevSubSess: g.add_node(prevSubSess) g.add_edge(subsessId,prevSubSess) return g def degreeDistrib(g): return collections.Counter(g.out_degree().values()) def subsessSummary(sessInfoList): g = sessGraphFromList(sessInfoList) d = degreeDistrib(g) subSessCounterMult = collections.Counter(map(lambda x:x["profileSubsessionCounter"], sessInfoList)) return {"info":sorted(sessInfoList, key=lambda x:x["profileSubsessionCounter"]), "graph":g, "degDist":d, "subSessCounterMult":subSessCounterMult} # In[10]: subsessSummaries = clients.map(lambda c: (c[0],subsessSummary(c[1]))) subsessSummaries.cache() # In[7]: # subsessSummaries.take(1) # In[11]: badGraphSummaries = subsessSummaries.filter( lambda id_summary: any([k>1 for k in id_summary[1]["degDist"].keys()]) \ or any([v>1 for k,v in id_summary[1]["degDist"].iteritems() if k!=1]) \ or any([v>1 for v in id_summary[1]["subSessCounterMult"].values()]) ) badGraphSummaries.cache() numBadGraphs = badGraphSummaries.count() numBadGraphs # #Percentage of clients with bad session graphs # In[17]: numBadGraphs/float(numClients) # ##Grab a few examples of graphs with imperfect session chains and plot them # In[13]: bg = badGraphSummaries.take(100) # In[11]: def plotSessGraphsAndInfo(sessGraphInfo): cId = sessGraphInfo[0] g = sessGraphInfo[1]["graph"] dg = sessGraphInfo[1]["degDist"] sscc = [(sessCounter,mult) for sessCounter,mult in sessGraphInfo[1]["subSessCounterMult"].iteritems() if mult>1] # print cId print "node degrees (degree, number of nodes with degree)--",list(dg.iteritems()) if sscc: print "repeated subsession counters (counter, multiplicty)--",sscc pos = nx.spectral_layout(g) labels = [(n,d['subSessNum']) for n,d in g.nodes(data=True) if d] G = nx.draw_spectral(g,node_size=10, node_color="w") for sessId,label in labels: x,y = pos[sessId] plt.text(x+.02,y+.02,s=label ,horizontalalignment='center') plt.show() # In[12]: map(plotSessGraphsAndInfo,bg) # In[137]: clientOfInterest = bg[3] def getDictFields(d,fields): return {field:d.get(field,None) for field in fields} plotSessGraphsAndInfo(clientOfInterest) dd=map(lambda d: getDictFields(d,["previousSubsessionId", "subsessionId", "profileSubsessionCounter"]), clientOfInterest[1]["info"]) # bg[3][1]["info"][0] dd.sort(key=lambda d:d["profileSubsessionCounter"]) dd # getDictFields(bg[3][1]["info"][0],["previousSubsessionId", # "subsessionId", # "subsessionCounter"]) clientOfInterest[1]["graph"].edges() # In[14]: def plotSessGraphsAndInfo2(sessGraphInfo): cId = sessGraphInfo[0] g = sessGraphInfo[1]["graph"] dg = sessGraphInfo[1]["degDist"] sscc = [(sessCounter,mult) for sessCounter,mult in sessGraphInfo[1]["subSessCounterMult"].iteritems() if mult>1] info = [ # print cId print "node degrees (degree, number of nodes with degree)--",list(dg.iteritems()) if sscc: print "repeated subsession counters (counter, multiplicty)--",sscc nodeNums = [d['subSessNum'] for n,d in g.nodes(data=True) if d] minNode = min(nodeNums) pos = {n:([d['subSessNum'],0] if d else [minNode-3,0+rand()]) for n,d in g.nodes(data=True)} # pos = nx.spectral_layout(g) labels = [(n,d['subSessNum']) for n,d in g.nodes(data=True) if d] fig, ax = plt.subplots(1,figsize=(18,3), dpi=100) G = nx.draw(g,pos=pos,ax=ax,node_size=10, node_color="w") for sessId,label in labels: x,y = pos[sessId] plt.text(x+.02,y+.02,s=label ,horizontalalignment='center') plt.show() plotSessGraphsAndInfo2(bg[10]) # In[22]: bg[10][1]["info"] # In[ ]: knownPos = {I["subsessionId"]: (I["profileSubsessionCounter"],I["subsessionCounter"]) for I in bg[10][1]["info"]} # In[24]: knownPos = {I["subsessionId"]: (I["profileSubsessionCounter"],I["subsessionCounter"]) for I in bg[10][1]["info"]} # In[33]: len(knownPos), len(bg[10][1]["graph"]), len(set(I["sessionId"] for I in bg[10][1]["info"])) # In[32]: gg=bg[10][1]["graph"] #look up gg.nodes(data=True) # In[184]: def sessGraphFromList2(sessInfoList): g = nx.DiGraph() subSessIds = [s['subsessionId'] for s in sessInfoList] numInferred = 0 for s in sessInfoList: # each s is a separate subsession submission "info" section # have to clear these vars to prevent spillover from previous iter subsessId = None prevSubSess = None try: subsessId = s['subsessionId'] except KeyError: continue N = s.get("profileSubsessionCounter",-1) subSessCounter = s.get("subsessionCounter",-1) g.add_node(subsessId,subSessNum=N,x=N,y=subSessCounter) prevSubSess = s.get('previousSubsessionId',None) if prevSubSess: #NB: adding these nodes allows us to infer the existence #of some sessions that have not actually been submitted. # Nodes added this way will not have any data attached to them. g.add_node(prevSubSess) g.add_edge(subsessId,prevSubSess) #add placement details for inferred nodes for nodeId,nodeData in (tup for tup in g.nodes(data=True) if not tup[1]): pred = g.predecessors(nodeId)[0] nodeData["x"] = g.node[pred]["x"]-random.random()/3 +.1/.6 nodeData["y"] = g.node[pred]["y"]-1-random.random() return g # In[185]: def plotSessGraph(g): pos = {n:[d['x'],d["y"]] for n,d in g.nodes(data=True)} fig, ax = plt.subplots(1,figsize=(18,3), dpi=100) G = nx.draw(g,pos=pos,ax=ax,node_size=10, node_color="w") for sessId,nodeData in [(n,d) for n,d in g.nodes(data=True) if d]: x,y = nodeData["x"],nodeData["y"] subSessNum = nodeData.get("subSessNum","NA") plt.text(x+.02,y+.15,s=subSessNum ,horizontalalignment='center') plt.show() # In[186]: gg = sessGraphFromList2(bg[10][1]["info"]) plotSessGraph(gg) # In[203]: def plotSessGraphAndInfo(i_x): i,x = i_x sessInfoList = x[1]["info"] g = sessGraphFromList2(sessInfoList) dg = degreeDistrib(g) print i if any([k>1 for k in dg.keys()]) \ or any([v>1 for k,v in dg.iteritems() if k!=1]): print "bad degree distribution: ",dg subSessCounterMult = collections.Counter(map(lambda x:x["profileSubsessionCounter"], sessInfoList)) if any([v>1 for v in subSessCounterMult.values()]): print "repeated profileSubsessionCounter(s): ", {c:m for c,m in subSessCounterMult.iteritems() if m>1} # print # print collections.Counter(map(lambda x:x["profileSubsessionCounter"], sessInfoList)) plotSessGraph(g) # In[205]: plotSessGraphAndInfo((5,bg[0])) bg[0][1]["info"] # In[198]: map(lambda i_x: plotSessGraphAndInfo(i_x),enumerate(bg)) # In[ ]: