#!/usr/bin/env python # coding: utf-8 # #Cleaning and merging a sample of v2+v4 data # see https://bugzilla.mozilla.org/show_bug.cgi?id=1152107 # In[1]: import ujson as json import matplotlib.pyplot as plt import pandas as pd import numpy as np import plotly.plotly as py import networkx as nx import collections from moztelemetry import get_pings, get_pings_properties, get_one_ping_per_client get_ipython().run_line_magic('pylab', 'inline') # In[2]: import boto conn = boto.connect_s3() # In[3]: numCores = sc.defaultParallelism numCores # #Set date and build filters # In[11]: submissionDateBounds = ("20150610","20150708") buildIdBounds = ("20150610000000", "99990507000000") # In[13]: def checkIfNotIdleDaily(p): try: out = p.get("payload",{}).get("info",{}).get("reason", "idle-daily")!="idle-daily" except: out = False return out # In[16]: v4Pings = get_pings(sc, app= "Firefox", channel= "nightly", submission_date= submissionDateBounds, build_id= buildIdBounds, fraction= 1, doc_type= "main", schema= "v4")\ .filter(checkIfNotIdleDaily) # In[17]: v4Pings.count() # In[18]: def getDictPaths(d, paths): result = {} for path in paths: pathItems = path.split("/") out = d.get(pathItems.pop(0),{}) while pathItems: out = out.get(pathItems.pop(0),{}) if out=={}: out="MISSING" result[path] = out return result # In[19]: v4PathsToV2fields = ['clientId', 'meta/appUpdateChannel', 'id', 'environment', 'application', 'version', 'creationDate', 'type', 'payload/info', 'payload/simpleMeasurements/activeTicks', 'payload/simpleMeasurements/totalTime', 'payload/simpleMeasurements/main', 'payload/simpleMeasurements/firstPaint', 'payload/simpleMeasurements/sessionRestored', 'payload/histograms/PLACES_PAGES_COUNT', 'payload/histograms/PLACES_BOOKMARKS_COUNT', 'payload/keyedHistograms/SEARCH_COUNTS'] # In[20]: #was having trouble with get_pings_properties(v4Pings,v4PathsToV2fields) # it was choking on histograms v4Subsess = v4Pings.map(lambda p: getDictPaths(p,v4PathsToV2fields))\ # .coalesce(numCores*2) #reducing the number of partitions seems to make the job fail. OOM? # v4f = v4Subsess.first() # v4f.keys() # In[21]: def dropDupePings(pingListIn): pingsAdded = [] pingListOut = [] for s in pingListIn: if s['id'] not in pingsAdded: pingsAdded.append(s['id']) pingListOut.append(s) return pingListOut # In[22]: v4HistoriesById = v4Subsess.map(lambda p: (p.get("clientId","noId"),[p]) ) \ .reduceByKey(lambda l1,l2: l1+l2) \ .mapValues(dropDupePings) # v4h = v4HistoriesById.first() # v4h # v4HistoriesById.count() # In[23]: v2SampleBucket = conn.get_bucket('mozillametricsfhrsamples') # Substitute in your bucket name # In[24]: bl = v2SampleBucket.list(prefix="nightly") print "v2 nightly data size:", sum(key.size for key in bl) list(bl)[-5:] # In[25]: # Saptarshi stores 100% of nightly records in 200 part files. # There are about 500K records, at about 2gb of data size. # For testing, we can grab just 10% of parts (the ones ending in 0) pathToV2Nightly = "s3n://mozillametricsfhrsamples/nightly/part-r-*0" v2Nightly = sc.sequenceFile(pathToV2Nightly) # In[26]: def getClientIdFromV2Str(v2Str): try: target = '"clientID":"' ind = v2Str.index(target) clientIdRaw = v2Str[ (ind+len(target)) : (ind+len(target)+36) ] return clientIdRaw except: return "MISSING" # In[27]: # re-key by clientID v2NightlyByClientId = v2Nightly\ .map(lambda id_json: (getClientIdFromV2Str(id_json[1]),id_json[1]) )\ .filter(lambda id_json: id_json[0]!="MISSING") # In[28]: v2NightlyByClientId.count() # In[29]: mergedDataPerClient = v4HistoriesById\ .join(v2NightlyByClientId)\ .map(lambda id__v4_v2: (id__v4_v2[0], {"clientId":id__v4_v2[0], "v4": id__v4_v2[1][0], 'v2': json.loads(id__v4_v2[1][1])} ) )\ .cache() # In[30]: mergedDataPerClient.count() # In[31]: mergedDataPerClient.getNumPartitions() # Why are there more partitions than rows? # Everything above looks reasonable; this data is pretty much what it should be I think... # In[32]: # do we need to do a coalesce or a repartition or something to get this to be saved # in a reasonable number of files? mergedDataPerClient_repart = mergedDataPerClient.repartition(200).cache() # In[33]: print v4Pings.getNumPartitions(), v4Subsess.getNumPartitions(), v4HistoriesById.getNumPartitions() print v2NightlyByClientId.getNumPartitions() print mergedDataPerClient_repart.getNumPartitions() # In[35]: print mergedDataPerClient_repart.count() # print mergedDataPerClient_repart.getNumPartitions() # In[ ]: # md = mergedDataPerClient_repart.first() # In[ ]: # print len(md),md[0] # # print "{"+md[1][49:] # dat = md[1] # # print dat # print dat.keys() # print dat['v2'].keys() # print len(dat['v4']) # print dat['v4'][0].keys() # In[36]: outBucketName = "net-mozaws-prod-us-west-2-pipeline-analysis" pathToOutput = "bcolloran/mergedDataPerClient/nightly/2015-07-09/8937clients/" # In[37]: mergedDataPerClient_repart.map(lambda x: (str(x[0]),json.dumps(x[1])) ) \ .saveAsSequenceFile( "s3n://"+outBucketName+"/"+pathToOutput ) # #Did it work? List the buckets # In[38]: outBucket = conn.get_bucket(outBucketName) # Substitute in your bucket name bl = outBucket.list(prefix=pathToOutput) mergedDataSize = sum(key.size for key in bl) print "mergedDataPerClient_repart data size:", mergedDataSize/(1.0*(10**9)),"GB" list(bl)[-5:] # ###Did it work? Try to load a few files # In[40]: pathToMergeTest = "s3n://"+outBucketName+"/"+pathToOutput+"part-0019*" print pathToMergeTest mergeTest = sc.sequenceFile(pathToMergeTest) # In[41]: mergeTest.count() # In[42]: mt = mergeTest.first() # In[45]: print len(mt),mt[0] print json.loads(mt[1]).keys() print len(json.loads(mt[1])['v4']) print json.loads(mt[1])['v4'][0].keys() print json.loads(mt[1])['v2'].keys()