import ujson as json
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import plotly.plotly as py
import networkx as nx
import collections
from moztelemetry import get_pings, get_pings_properties, get_one_ping_per_client
%pylab inline
Populating the interactive namespace from numpy and matplotlib
import boto
conn = boto.connect_s3()
numCores = sc.defaultParallelism
numCores
80
submissionDateBounds = ("20150610","20150708")
buildIdBounds = ("20150610000000", "99990507000000")
def checkIfNotIdleDaily(p):
try:
out = p.get("payload",{}).get("info",{}).get("reason", "idle-daily")!="idle-daily"
except:
out = False
return out
v4Pings = get_pings(sc, app= "Firefox",
channel= "nightly",
submission_date= submissionDateBounds,
build_id= buildIdBounds,
fraction= 1,
doc_type= "main",
schema= "v4")\
.filter(checkIfNotIdleDaily)
v4Pings.count()
4255613
def getDictPaths(d, paths):
result = {}
for path in paths:
pathItems = path.split("/")
out = d.get(pathItems.pop(0),{})
while pathItems:
out = out.get(pathItems.pop(0),{})
if out=={}:
out="MISSING"
result[path] = out
return result
v4PathsToV2fields = ['clientId',
'meta/appUpdateChannel',
'id',
'environment',
'application',
'version',
'creationDate',
'type',
'payload/info',
'payload/simpleMeasurements/activeTicks',
'payload/simpleMeasurements/totalTime',
'payload/simpleMeasurements/main',
'payload/simpleMeasurements/firstPaint',
'payload/simpleMeasurements/sessionRestored',
'payload/histograms/PLACES_PAGES_COUNT',
'payload/histograms/PLACES_BOOKMARKS_COUNT',
'payload/keyedHistograms/SEARCH_COUNTS']
#was having trouble with get_pings_properties(v4Pings,v4PathsToV2fields)
# it was choking on histograms
v4Subsess = v4Pings.map(lambda p: getDictPaths(p,v4PathsToV2fields))\
# .coalesce(numCores*2) #reducing the number of partitions seems to make the job fail. OOM?
v4f = v4Subsess.first()
v4f.keys()
def dropDupePings(pingListIn):
pingsAdded = []
pingListOut = []
for s in pingListIn:
if s['id'] not in pingsAdded:
pingsAdded.append(s['id'])
pingListOut.append(s)
return pingListOut
v4HistoriesById = v4Subsess.map(lambda p: (p.get("clientId","noId"),[p]) ) \
.reduceByKey(lambda l1,l2: l1+l2) \
.mapValues(dropDupePings)
v4h = v4HistoriesById.first()
v4h
v4HistoriesById.count()
v2SampleBucket = conn.get_bucket('mozillametricsfhrsamples') # Substitute in your bucket name
bl = v2SampleBucket.list(prefix="nightly")
print "v2 nightly data size:", sum(key.size for key in bl)
list(bl)[-5:]
v2 nightly data size: 1887247209
[<Key: mozillametricsfhrsamples,nightly/part-r-00195>, <Key: mozillametricsfhrsamples,nightly/part-r-00196>, <Key: mozillametricsfhrsamples,nightly/part-r-00197>, <Key: mozillametricsfhrsamples,nightly/part-r-00198>, <Key: mozillametricsfhrsamples,nightly/part-r-00199>]
# Saptarshi stores 100% of nightly records in 200 part files.
# There are about 500K records, at about 2gb of data size.
# For testing, we can grab just 10% of parts (the ones ending in 0)
pathToV2Nightly = "s3n://mozillametricsfhrsamples/nightly/part-r-*0"
v2Nightly = sc.sequenceFile(pathToV2Nightly)
def getClientIdFromV2Str(v2Str):
try:
target = '"clientID":"'
ind = v2Str.index(target)
clientIdRaw = v2Str[ (ind+len(target)) : (ind+len(target)+36) ]
return clientIdRaw
except:
return "MISSING"
# re-key by clientID
v2NightlyByClientId = v2Nightly\
.map(lambda id_json: (getClientIdFromV2Str(id_json[1]),id_json[1]) )\
.filter(lambda id_json: id_json[0]!="MISSING")
v2NightlyByClientId.count()
38239
mergedDataPerClient = v4HistoriesById\
.join(v2NightlyByClientId)\
.map(lambda id__v4_v2: (id__v4_v2[0],
{"clientId":id__v4_v2[0], "v4": id__v4_v2[1][0], 'v2': json.loads(id__v4_v2[1][1])} ) )\
.cache()
mergedDataPerClient.count()
8937
mergedDataPerClient.getNumPartitions()
59787
Why are there more partitions than rows?
Everything above looks reasonable; this data is pretty much what it should be I think...
# do we need to do a coalesce or a repartition or something to get this to be saved
# in a reasonable number of files?
mergedDataPerClient_repart = mergedDataPerClient.repartition(200).cache()
print v4Pings.getNumPartitions(), v4Subsess.getNumPartitions(), v4HistoriesById.getNumPartitions()
print v2NightlyByClientId.getNumPartitions()
print mergedDataPerClient_repart.getNumPartitions()
59767 59767 59767 20 200
print mergedDataPerClient_repart.count()
# print mergedDataPerClient_repart.getNumPartitions()
8937
# md = mergedDataPerClient_repart.first()
# print len(md),md[0]
# # print "{"+md[1][49:]
# dat = md[1]
# # print dat
# print dat.keys()
# print dat['v2'].keys()
# print len(dat['v4'])
# print dat['v4'][0].keys()
outBucketName = "net-mozaws-prod-us-west-2-pipeline-analysis"
pathToOutput = "bcolloran/mergedDataPerClient/nightly/2015-07-09/8937clients/"
mergedDataPerClient_repart.map(lambda x: (str(x[0]),json.dumps(x[1])) ) \
.saveAsSequenceFile( "s3n://"+outBucketName+"/"+pathToOutput )
outBucket = conn.get_bucket(outBucketName) # Substitute in your bucket name
bl = outBucket.list(prefix=pathToOutput)
mergedDataSize = sum(key.size for key in bl)
print "mergedDataPerClient_repart data size:", mergedDataSize/(1.0*(10**9)),"GB"
list(bl)[-5:]
mergedDataPerClient_repart data size: 3.320365309 GB
[<Key: net-mozaws-prod-us-west-2-pipeline-analysis,bcolloran/mergedDataPerClient/nightly/2015-07-09/8937clients/part-00195>, <Key: net-mozaws-prod-us-west-2-pipeline-analysis,bcolloran/mergedDataPerClient/nightly/2015-07-09/8937clients/part-00196>, <Key: net-mozaws-prod-us-west-2-pipeline-analysis,bcolloran/mergedDataPerClient/nightly/2015-07-09/8937clients/part-00197>, <Key: net-mozaws-prod-us-west-2-pipeline-analysis,bcolloran/mergedDataPerClient/nightly/2015-07-09/8937clients/part-00198>, <Key: net-mozaws-prod-us-west-2-pipeline-analysis,bcolloran/mergedDataPerClient/nightly/2015-07-09/8937clients/part-00199>]
pathToMergeTest = "s3n://"+outBucketName+"/"+pathToOutput+"part-0019*"
print pathToMergeTest
mergeTest = sc.sequenceFile(pathToMergeTest)
s3n://net-mozaws-prod-us-west-2-pipeline-analysis/bcolloran/mergedDataPerClient/nightly/2015-07-09/8937clients/part-0019*
mergeTest.count()
456
mt = mergeTest.first()
print len(mt),mt[0]
print json.loads(mt[1]).keys()
print len(json.loads(mt[1])['v4'])
print json.loads(mt[1])['v4'][0].keys()
print json.loads(mt[1])['v2'].keys()
2 402cfb52-012a-a24b-b2f5-866eb78e7530 [u'v2', u'v4', u'clientId'] 17 [u'payload/info', u'payload/simpleMeasurements/totalTime', u'payload/simpleMeasurements/sessionRestored', u'type', u'payload/simpleMeasurements/main', u'payload/simpleMeasurements/firstPaint', u'clientId', u'environment', u'application', u'payload/histograms/PLACES_PAGES_COUNT', u'version', u'payload/simpleMeasurements/activeTicks', u'meta/appUpdateChannel', u'creationDate', u'payload/histograms/PLACES_BOOKMARKS_COUNT', u'id', u'payload/keyedHistograms/SEARCH_COUNTS'] [u'thisPingDate', u'geckoAppInfo', u'lastPingDate', u'geoCountry', u'clientID', u'version', u'clientIDVersion', u'data', u'BAGHEERA_TS']