%pylab inline from pandas import * from pandas.io.pytables import HDFStore from pandas.tseries.index import to_offset import dateutil from joblib import Parallel, delayed import time from datetime import timedelta def process_period(day, delta, span='1D'): """Process a time slice [day, day+delta) and aggregate on time span. """ # load time slice from hdf5 (creted by hdf5_dataset.ipynb) store = HDFStore('stores/sms-call-internet-mi-table-blosc.h5') intensity_data = store.select('telco_data', "index >= Timestamp('%s') & index < Timestamp('%s')" % (day, day + delta)) intensity_data = intensity_data.fillna(0) # groupby 'groups' and aggregate data in 'columns' groups = ["Square_id", 'Country_code'] columns = ['SMS_in', 'SMS_out', 'Call_in', 'Call_out'] # TODO: resample()? result = intensity_data.groupby(TimeGrouper(span)).apply(lambda x: x.groupby(groups)[columns].sum()) store.close() return result start_date = datetime(2013,11,1) end_date = datetime(2013,12,31) interval_freq = '1D' interval = to_offset(interval_freq).delta days = date_range(start_date, end_date, freq=interval_freq, tz='Europe/Rome') print datetime.now() # NOTE: must be <= interval_freq time_span = '1D' days_result = Parallel(n_jobs=8, verbose=5)(delayed(process_period)(day, interval, time_span) for day in days) print datetime.now() # NOTE: common store for the pipeline store = HDFStore('./stores/aggregated_dataset.h5') # test if 'intensity_' + time_span in store: raise Exception('refusing to overwrite') for result in days_result: store.append('intensity_' + time_span, result) store.flush() store.close()