# !wget https://nyctaxitrips.blob.core.windows.net/data/trip_data_{1,2,3,4,5,6,7,8,9,10,11,12}.csv.zip from blaze import * d = Data('trip_data_*.csv') d by(d.passenger_count, avg_distance=d.trip_distance.mean(), count=d.passenger_count.count()) expr = by(d.passenger_count, avg_distance=d.trip_distance.mean(), count=d.passenger_count.count()) %time _ = compute(expr) import multiprocessing pool = multiprocessing.Pool(4) %time _ = compute(expr, map=pool.map) totals = [] counts = [] for chunk in pd.read_csv('trip_data_1.csv', chunksize=1000000, usecols=['passenger_count']): totals.append(chunk.passenger_count.sum()) counts.append(chunk.passenger_count.count()) 1.0 * sum(totals) / sum(counts) Data('trip_data_1.csv').passenger_count.mean() %%time from glob import glob # Specifying active columns at parse time greatly improves performance active_columns = ['passenger_count', 'trip_distance'] intermediates = [] # Do a split-apply-combine operation on each chunk of each CSV file for fn in sorted(glob('trip_data_*.csv')): for df in pd.read_csv(fn, usecols=active_columns, chunksize=1000000, skipinitialspace=True): chunk = df.groupby('passenger_count').agg({'passenger_count': ['count'], 'trip_distance': ['sum', 'count']}) intermediates.append(chunk) # Bring those results together. These are much smaller and so likely fit in memory df = pd.concat(intermediates, axis=0) df.columns = ['trip_distance_sum', 'trip_distance_count', 'passenger_count_count'] # Flatten multi-index # Perform second split-apply-combine operation on those intermediate results groups = df.groupby(df.index) # group once for many of the following applies df2 = pd.concat([groups.trip_distance_sum.sum(), groups.trip_distance_count.sum(), groups.passenger_count_count.sum()], axis=1) df2['avg_distance'] = df2.trip_distance_sum / df2.trip_distance_count df2['count'] = df2.passenger_count_count # Select out the columns we want result = df2[['avg_distance', 'count']] result