%pylab inline
Populating the interactive namespace from numpy and matplotlib
from pandas import *
from pandas.io.pytables import HDFStore
from pandas.tseries.index import to_offset
import dateutil
from joblib import Parallel, delayed
import time
from datetime import timedelta
def process_period(day, delta, span='1D'):
"""Process a time slice [day, day+delta) and aggregate on time span.
"""
# load time slice from hdf5 (creted by hdf5_dataset.ipynb)
store = HDFStore('stores/sms-call-internet-mi-table-blosc.h5')
intensity_data = store.select('telco_data', "index >= Timestamp('%s') & index < Timestamp('%s')" % (day, day + delta))
intensity_data = intensity_data.fillna(0)
# groupby 'groups' and aggregate data in 'columns'
groups = ["Square_id", 'Country_code']
columns = ['SMS_in', 'SMS_out', 'Call_in', 'Call_out']
# TODO: resample()?
result = intensity_data.groupby(TimeGrouper(span)).apply(lambda x: x.groupby(groups)[columns].sum())
store.close()
return result
start_date = datetime(2013,11,1)
end_date = datetime(2013,12,31)
interval_freq = '1D'
interval = to_offset(interval_freq).delta
days = date_range(start_date, end_date, freq=interval_freq, tz='Europe/Rome')
print datetime.now()
# NOTE: must be <= interval_freq
time_span = '1D'
days_result = Parallel(n_jobs=8, verbose=5)(delayed(process_period)(day, interval, time_span) for day in days)
print datetime.now()
2015-01-25 17:58:26.919533
[Parallel(n_jobs=8)]: Done 1 out of 61 | elapsed: 10.5s remaining: 10.5min [Parallel(n_jobs=8)]: Done 11 out of 61 | elapsed: 23.3s remaining: 1.8min [Parallel(n_jobs=8)]: Done 24 out of 61 | elapsed: 35.3s remaining: 54.4s [Parallel(n_jobs=8)]: Done 37 out of 61 | elapsed: 57.7s remaining: 37.4s [Parallel(n_jobs=8)]: Done 50 out of 61 | elapsed: 1.3min remaining: 17.0s [Parallel(n_jobs=8)]: Done 61 out of 61 | elapsed: 1.4min finished
2015-01-25 17:59:51.246540
# NOTE: common store for the pipeline
store = HDFStore('./stores/aggregated_dataset.h5') # test
if 'intensity_' + time_span in store:
raise Exception('refusing to overwrite')
for result in days_result:
store.append('intensity_' + time_span, result)
store.flush()
store.close()