#!/usr/bin/env python # coding: utf-8 # In[1]: import ujson as json import matplotlib.pyplot as plt import pandas as pd import numpy as np import plotly.plotly as py import networkx as nx import collections from moztelemetry import get_pings, get_pings_properties, get_one_ping_per_client get_ipython().run_line_magic('pylab', 'inline') # In[2]: import boto conn = boto.connect_s3() # In[3]: numCores = sc.defaultParallelism numCores # In[4]: v4Pings = get_pings(sc, app="Firefox", channel="nightly", submission_date=("20150525","20150609"), build_id=("20150507000000", "99990507000000"), fraction=1, doc_type="main", schema="v4")\ .filter(lambda p: p.get("payload",{}).get("info",{}).get("reason", "idle-daily")!="idle-daily") # In[5]: v4Pings.count() # In[6]: def getDictPaths(d, paths): result = {} for path in paths: pathItems = path.split("/") out = d.get(pathItems.pop(0),{}) while pathItems: out = out.get(pathItems.pop(0),{}) if out=={}: out="MISSING" result[path] = out return result # In[7]: v4PathsToV2fields = ['clientId', 'meta/appUpdateChannel', 'id', 'environment', 'application', 'version', 'creationDate', 'type', 'payload/info', 'payload/simpleMeasurements/activeTicks', 'payload/simpleMeasurements/totalTime', 'payload/simpleMeasurements/main', 'payload/simpleMeasurements/firstPaint', 'payload/simpleMeasurements/sessionRestored', 'payload/histograms/PLACES_PAGES_COUNT', 'payload/histograms/PLACES_BOOKMARKS_COUNT', 'payload/keyedHistograms/SEARCH_COUNTS', 'payload/keyedHistograms/SEARCH_DEFAULT_ENGINE'] # In[8]: #was having trouble with get_pings_properties(v4Pings,v4PathsToV2fields) # it was choking on histograms v4Subsess = v4Pings.map(lambda p: getDictPaths(p,v4PathsToV2fields))\ # .coalesce(numCores*2) #reducing the number of partitions seems to make the job fail. OOM? # In[9]: v4f = v4Subsess.first() # In[10]: v4f.keys() # In[11]: def dropDupePings(pingListIn): pingsAdded = [] pingListOut = [] for s in pingListIn: if s['id'] not in pingsAdded: pingsAdded.append(s['id']) pingListOut.append(s) return pingListOut # In[12]: v4HistoriesById = v4Subsess.map(lambda p: (p.get("clientId","noId"),[p]) ) \ .reduceByKey(lambda l1,l2: l1+l2) \ .mapValues(dropDupePings) # In[13]: v4h = v4HistoriesById.first() # In[14]: v4h # v4HistoriesById.count() # In[15]: v2SampleBucket = conn.get_bucket('mozillametricsfhrsamples') # Substitute in your bucket name # In[16]: bl = v2SampleBucket.list(prefix="nightly") print "v2 nightly data size:", sum(key.size for key in bl) list(bl)[-5:] # In[17]: # Saptarshi stores 100% of nightly records in 200 part files. # There are about 500K records, at about 2gb of data size. # For testing, we can grab just 10% of parts (the ones ending in 0) pathToV2Nightly = "s3n://mozillametricsfhrsamples/nightly/part-r-*0" v2Nightly = sc.sequenceFile(pathToV2Nightly) # In[18]: def getClientIdFromV2Str(v2Str): try: target = '"clientID":"' ind = v2Str.index(target) clientIdRaw = v2Str[ (ind+len(target)) : (ind+len(target)+36) ] return clientIdRaw except: return "MISSING" # In[19]: # re-key by clientID v2NightlyByClientId = v2Nightly\ .map(lambda id_json: (getClientIdFromV2Str(id_json[1]),id_json[1]) )\ .filter(lambda id_json: id_json[0]!="MISSING") # In[20]: v2NightlyByClientId.count() # In[21]: mergedDataPerClient = v4HistoriesById\ .join(v2NightlyByClientId)\ .map(lambda id__v4_v2: (id__v4_v2[0], {"clientId":id__v4_v2[0], "v4": id__v4_v2[1][0], 'v2': json.loads(id__v4_v2[1][1])} ) )\ .cache() # In[22]: mergedDataPerClient.count() # In[23]: mergedDataPerClient.getNumPartitions() # Why are there more partitions than rows? # Everything above looks reasonable; this data is pretty much what it should be I think... # In[24]: # do we need to do a coalesce or a repartition or something to get this to be saved # in a reasonable number of files? mergedDataPerClient_repart = mergedDataPerClient.repartition(200).cache() # In[25]: print v4Pings.getNumPartitions(), v4Subsess.getNumPartitions(), v4HistoriesById.getNumPartitions() print v2NightlyByClientId.getNumPartitions() print mergedDataPerClient_repart.getNumPartitions() # In[26]: # print mergedDataPerClient_repart.count() # print mergedDataPerClient_repart.getNumPartitions() # In[27]: # md = mergedDataPerClient_repart.first() # In[28]: # print len(md),md[0] # # print "{"+md[1][49:] # dat = md[1] # # print dat # print dat.keys() # print dat['v2'].keys() # print len(dat['v4']) # print dat['v4'][0].keys() # In[34]: outBucketName = "net-mozaws-prod-us-west-2-pipeline-analysis" pathToOutput = "/bcolloran/mergedDataPerClient/nightly/2015-06-10/8428clients/" # In[35]: mergedDataPerClient_repart.map(lambda x: (str(x[0]),json.dumps(x[1])) ) \ .saveAsSequenceFile( "s3n://"+outBucketName+pathToOutput ) # #Did it work? List the buckets # In[36]: outBucket = conn.get_bucket(outBucketName) # Substitute in your bucket name bl = outBucket.list(prefix=pathToOutput[1:]) mergedDataSize = sum(key.size for key in bl) print "mergedDataPerClient_repart data size:", mergedDataSize/(1.0*(10**9)),"GB" list(bl)[-5:] # #Did it work? Try to load a single file # In[64]: pathToMergeTest = "s3n://"+outBucketName+pathToOutput+"part-0019*" print pathToMergeTest mergeTest = sc.sequenceFile(pathToMergeTest) # In[65]: mergeTest.count() # In[66]: mt = mergeTest.first() # In[67]: len(mt) # In[79]: print len(mt),mt[0] # print mt[1].keys() print json.loads(mt[1]).keys() print len(json.loads(mt[1])['v4']) print json.loads(mt[1])['v4'][0].keys() print json.loads(json.loads(mt[1])['v2']) # In[70]: data = mt[1] # In[72]: json.loads(data) # In[18]: 1 # In[ ]: