#!/usr/bin/env python # coding: utf-8 # In[97]: import ujson as json import matplotlib.pyplot as plt import pandas as pd import numpy as np import plotly.plotly as py import networkx as nx import collections import datetime from moztelemetry import get_pings, get_pings_properties, get_one_ping_per_client # %pylab inline # In[2]: sc.defaultParallelism # #Get pings, filter them, and do some basic checks # In[3]: pings = get_pings(sc, app="Firefox", channel="nightly", submission_date=("20150520","20150530"), fraction=1, schema="v4") # In[4]: p = pings.first() # In[5]: # p["payload"]["info"] p.keys() # p.get("payload",{}).get("info",{}).get("subsessionId",False) # {k:p[k] for k in p.keys() if k!="main"} # In[11]: #note that we have to filter out the 'meta' entry, b/c this can contain things like intake timestamps, #which should be expected to change if the same ping is sent twice pingsByPingId = pings \ .map(lambda p: (p.get("id","MISSING"), [{k:p[k] for k in p.keys() if k!="meta"}]) ) \ .reduceByKey(lambda l1,l2: l1+l2) pingsByPingId.cache() # # Group pings by clientId # Get the "payload/info" section recent pings from builds newer than 20150507000000, and which are not idle-daily pings, and group by clientId # In[6]: info = get_pings_properties(pings, ["id", "clientId", "type", "payload/info", "environment/build/buildId"]) subsess = info.filter( lambda p: ((p["payload/info"].get("reason", "idle-daily") != "idle-daily") and (p["type"] == "main") and (p["environment/build/buildId"]>"20150507000000")) ) # subsess.cache() # numPings = subsess.count() # numPings # In[7]: clients = subsess.map(lambda p: (p.get("clientId","noId"), [p["payload/info"]]) ) \ .reduceByKey(lambda l1,l2: l1+l2) # ### Remove duplicate subsessions for each client # In[42]: def dropDupeSubsessions(subsessList): subsessIdsAdded = [] subsessListOut = [] for s in subsessList: if s['subsessionId'] not in subsessIdsAdded: subsessIdsAdded.append(s['subsessionId']) subsessListOut.append(s) return subsessListOut # In[43]: cleanedClients = clients \ .map(lambda id_sesslist: (id_sesslist[0],dropDupeSubsessions(id_sesslist[1])) ) # cleanedClients.cache() # numClients = cleanedClients.count() # numClients # In[44]: # cc = cleanedClients.first() # ##Construct session graphs and compute degree distributions over graphs # # Once we've grouped by clientId, we can use the subSessionIds and previousSubsessionId pointer to construct a graph of the session, and then look at the degree of each node in the graph to see whether the graph looks the way we expect. Ideally, we should have a graph that has *zero* nodes with degree greater than 1 (no subsession should be pointed at by two or more sessions) and only *one* node with degree zero (the first subsession in the chain won't point to any prior subsession). Additionally, any given value of 'profileSubsessionCounter' should appear only once. # # In[34]: def sessGraphFromList(sessInfoList): g = nx.DiGraph() for s in sessInfoList: # each s is a separate subsession submission "info" section # have to clear these vars to prevent spillover from previous iter subsessId = None prevSubSess = None try: subsessId = s['subsessionId'] except KeyError: continue N = s.get("profileSubsessionCounter","NA") g.add_node(subsessId,subSessNum=N) prevSubSess = s.get('previousSubsessionId',None) if prevSubSess: g.add_node(prevSubSess) g.add_edge(subsessId,prevSubSess) return g def degreeDistrib(g): return collections.Counter(g.out_degree().values()) def subsessSummary(sessInfoList): g = sessGraphFromList(sessInfoList) d = degreeDistrib(g) subSessCounterMult = collections.Counter(map(lambda x:x["profileSubsessionCounter"], sessInfoList)) return {"info":sorted(sessInfoList, key=lambda x:x["profileSubsessionCounter"]), "graph":g, "degDist":d, "subSessCounterMult":subSessCounterMult} # In[45]: # cc # In[46]: subsessSummaries = cleanedClients.map(lambda c: (c[0],subsessSummary(c[1]))) # subsessSummaries.cache() # In[38]: # sss = subsessSummaries.take(1) # In[47]: badGraphSummaries = subsessSummaries.filter( lambda id_summary: any([k>1 for k in id_summary[1]["degDist"].keys()]) \ or any([v>1 for k,v in id_summary[1]["degDist"].iteritems() if k!=1]) \ or any([v>1 for v in id_summary[1]["subSessCounterMult"].values()]) ) badGraphSummaries.cache() numBadGraphs = badGraphSummaries.count() numBadGraphs # #Percentage of clients with bad session graphs # In[48]: numBadGraphs/float(numClients) # ##Grab a few examples of graphs with imperfect session chains and plot them # In[163]: bg = badGraphSummaries.take(100) # In[106]: def sessGraphFromList2(sessInfoList,byDate=True): g = nx.DiGraph() subSessIds = [s['subsessionId'] for s in sessInfoList] numInferred = 0 nodePositions = [] for s in sessInfoList: # each s is a separate subsession submission "info" section # have to clear these vars to prevent spillover from previous iter subsessId = None prevSubSess = None try: subsessId = s['subsessionId'] except KeyError: continue N = s.get("profileSubsessionCounter",-1) subSessCounter = s.get("subsessionCounter",-1) #if a node has already been added with this (x,y) position, bump up the y position if byDate: dateStr = s.get("sessionStartDate","2015-01-01")[0:10] dateNum = datetime.datetime.strptime(dateStr,"%Y-%m-%d").toordinal() thisPosition = [dateNum,subSessCounter] else: thisPosition = [N,subSessCounter] while thisPosition in nodePositions: thisPosition[1]+=1 g.add_node(subsessId,subSessNum=N,x=thisPosition[0], y=thisPosition[1]) nodePositions.append(thisPosition) prevSubSess = s.get('previousSubsessionId',None) # if prevSubSess: # #NB: adding these nodes allows us to infer the existence # #of some sessions that have not actually been submitted. # # Nodes added this way will not have any data attached to them. # g.add_node(prevSubSess) # g.add_edge(subsessId,prevSubSess) if prevSubSess in subSessIds: #NB: adding these nodes allows us to infer the existence #of some sessions that have not actually been submitted. # Nodes added this way will not have any data attached to them. g.add_node(prevSubSess) g.add_edge(subsessId,prevSubSess) #add placement details for inferred nodes for nodeId,nodeData in (tup for tup in g.nodes(data=True) if not tup[1]): pred = g.predecessors(nodeId)[0] nodeData["x"] = g.node[pred]["x"]-random.random()/3 +.1/.6 nodeData["y"] = g.node[pred]["y"]-1-random.random() return g # In[118]: def plotSessGraph(g): pos = {n:[d['x'],d["y"]] for n,d in g.nodes(data=True)} fig, ax = plt.subplots(1,figsize=(18,3), dpi=100) G = nx.draw(g,pos=pos,ax=ax,node_size=300, node_color="w") for sessId,nodeData in [(n,d) for n,d in g.nodes(data=True) if d]: x,y = nodeData["x"],nodeData["y"] subSessNum = nodeData.get("subSessNum","NA") plt.text(x,y-.1,s=subSessNum ,horizontalalignment='center') plt.show() # In[152]: def plotSessGraphAndInfo(i_x): i,x = i_x sessInfoList = x[1]["info"] g = sessGraphFromList2(sessInfoList) dg = degreeDistrib(g) print "############################################" print "session graph index:", i if any([k>1 for k in dg.keys()]) \ or any([v>1 for k,v in dg.iteritems() if k!=1]): print "bad degree distribution: ",dg subSessCounterMult = collections.Counter(map(lambda x:x["profileSubsessionCounter"], sessInfoList)) if any([v>1 for v in subSessCounterMult.values()]): print "repeated profileSubsessionCounter(s): ", {c:m for c,m in subSessCounterMult.iteritems() if m>1} # print # print collections.Counter(map(lambda x:x["profileSubsessionCounter"], sessInfoList)) print "subsessions by date" plotSessGraph(g) print "subsessions by profileSubsessionCounter (x-axis)" g = sessGraphFromList2(sessInfoList,byDate=False) plotSessGraph(g) # ##Explanation of the plots used below # # In what follows, two plots are drawn for depicting the subsession histories of a sample of clients. For each client, we draw: # 1. The subsession graph organized by date the x-axis. In these plots, all the subsession that occured on one day are stacked in vertical column coresponding to the integer day number. These plots help to show cases in which "profileSubsessionCounter" is reset to 1. # 2. The subsession graph organized by subsessionId. In these plots, the x-axis position of each subsession is given by its subsession number. These plots help to show cases in which there are big gaps between recorded subsession. # # # In[153]: i=0 plotSessGraphAndInfo((i,bg[i])) # bg[i] # ##Clients with profileSubsessionCounter reset to 1 # # The following clients have had their profileSubsessionCounter reset at some point. Some of these also have missing subsessions. # In[171]: resetClients = [] for i,clientData in enumerate(bg): clientDates = [ss.get('sessionStartDate',0) for ss in clientData[1]["info"]] if any([ (ss.get('profileSubsessionCounter',0)==1 and any(ss.get('sessionStartDate',None)>d for d in clientDates)) for ss in clientData[1]["info"]]): print i plotSessGraphAndInfo((i,clientData)) resetClients.append(i) # # Other anomalies # # Most of the other anomalies showing up here are simple gaps in the subsession history, but there are a couple more interesting examples, so please feel free to scrutinize. Number 73 is particularly interesting-- it may be a copied profile or something like that, or it may be another bug. # In[173]: for i,clientData in enumerate(bg): if i not in resetClients: print i plotSessGraphAndInfo((i,clientData)) # In[111]: i=4 plotSessGraphAndInfo((5,bg[i])) keysToGet = [ "previousSubsessionId", u'profileSubsessionCounter', # u'reason', # u'revision', # u'sessionId', u'sessionStartDate', # u'subsessionCounter', u'subsessionId']#, # u'subsessionLength', # u'subsessionStartDate'] def getKeysFromDict(d,keys): return {k:d.get(k,"MISSING") for k in keys} [getKeysFromDict(d,keysToGet) for d in bg[i][1]["info"]] # bg[i][1]["info"] # In[101]: datetime.datetime.strptime("2015-05-24","%Y-%m-%d").toordinal() # In[ ]: