Large CSV files, or large collections of many CSV files are a common-yet-cumbersome data format. If the data in these files doesn't fit into memory we're usually forced to
This notebook demonstrates that Blaze operates smoothly on such data. It then shows exactly how Blaze uses Pandas by effectively automating the gynmastics in step 2. We perform an out-of-core split-apply-combine operation on the NYC Taxicab dataset while using a comfortably small amount of space.
All computations in this notebook were done on a personal laptop with smallish memory using a recent version of blaze
conda install -c blaze blaze
or
pip install blaze
Note, this is a lot of data to download. It's also a lot of data to serve. You might consider grabbing this from a torrent instead.
# !wget https://nyctaxitrips.blob.core.windows.net/data/trip_data_{1,2,3,4,5,6,7,8,9,10,11,12}.csv.zip
We design the blaze.Data
constructor to be easy to use. Here we give it a globstring of the files we want to analyze.
It gives us a quick head
of the data immediately, even though there are several gigabytes of data. If you're unfamiliar with the data you may want to quickly peruse the columns and values.
from blaze import *
d = Data('trip_data_*.csv')
d
medallion | hack_license | vendor_id | rate_code | store_and_fwd_flag | pickup_datetime | dropoff_datetime | passenger_count | trip_time_in_secs | trip_distance | pickup_longitude | pickup_latitude | dropoff_longitude | dropoff_latitude | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 89D227B655E5C82AECF13C3F540D4CF4 | BA96DE419E711691B9445D6A6307C170 | CMT | 1 | N | 2013-01-01 15:11:48 | 2013-01-01 15:18:10 | 4 | 382 | 1.0 | -73.978165 | 40.757977 | -73.989838 | 40.751171 |
1 | 0BD7C8F5BA12B88E0B67BED28BEA73D8 | 9FD8F69F0804BDB5549F40E9DA1BE472 | CMT | 1 | N | 2013-01-06 00:18:35 | 2013-01-06 00:22:54 | 1 | 259 | 1.5 | -74.006683 | 40.731781 | -73.994499 | 40.750660 |
2 | 0BD7C8F5BA12B88E0B67BED28BEA73D8 | 9FD8F69F0804BDB5549F40E9DA1BE472 | CMT | 1 | N | 2013-01-05 18:49:41 | 2013-01-05 18:54:23 | 1 | 282 | 1.1 | -74.004707 | 40.737770 | -74.009834 | 40.726002 |
3 | DFD2202EE08F7A8DC9A57B02ACB81FE2 | 51EE87E3205C985EF8431D850C786310 | CMT | 1 | N | 2013-01-07 23:54:15 | 2013-01-07 23:58:20 | 2 | 244 | 0.7 | -73.974602 | 40.759945 | -73.984734 | 40.759388 |
4 | DFD2202EE08F7A8DC9A57B02ACB81FE2 | 51EE87E3205C985EF8431D850C786310 | CMT | 1 | N | 2013-01-07 23:25:03 | 2013-01-07 23:34:24 | 1 | 560 | 2.1 | -73.976250 | 40.748528 | -74.002586 | 40.747868 |
5 | 20D9ECB2CA0767CF7A01564DF2844A3E | 598CCE5B9C1918568DEE71F43CF26CD2 | CMT | 1 | N | 2013-01-07 15:27:48 | 2013-01-07 15:38:37 | 1 | 648 | 1.7 | -73.966743 | 40.764252 | -73.983322 | 40.743763 |
6 | 496644932DF3932605C22C7926FF0FE0 | 513189AD756FF14FE670D10B92FAF04C | CMT | 1 | N | 2013-01-08 11:01:15 | 2013-01-08 11:08:14 | 1 | 418 | 0.8 | -73.995804 | 40.743977 | -74.007416 | 40.744343 |
7 | 0B57B9633A2FECD3D3B1944AFC7471CF | CCD4367B417ED6634D986F573A552A62 | CMT | 1 | N | 2013-01-07 12:39:18 | 2013-01-07 13:10:56 | 3 | 1898 | 10.7 | -73.989937 | 40.756775 | -73.865250 | 40.770630 |
8 | 2C0E91FF20A856C891483ED63589F982 | 1DA2F6543A62B8ED934771661A9D2FA0 | CMT | 1 | N | 2013-01-07 18:15:47 | 2013-01-07 18:20:47 | 1 | 299 | 0.8 | -73.980072 | 40.743137 | -73.982712 | 40.735336 |
9 | 2D4B95E2FA7B2E85118EC5CA4570FA58 | CD2F522EEE1FF5F5A8D8B679E23576B3 | CMT | 1 | N | 2013-01-07 15:33:28 | 2013-01-07 15:49:26 | 2 | 957 | 2.5 | -73.977936 | 40.786983 | -73.952919 | 40.806370 |
10 | E12F6AF991172EAC3553144A0AF75A19 | 06918214E951FA0003D1CC54955C2AB0 | CMT | 1 | N | 2013-01-08 13:11:52 | 2013-01-08 13:19:50 | 1 | 477 | 1.3 | -73.982452 | 40.773167 | -73.964134 | 40.773815 |
We now compute the average distance and number of rides grouped by the number of passengers riding in the cab.
Looks like single passenger trips are the most common while three passenger trips are surprisingly long distance.
by(d.passenger_count, avg_distance=d.trip_distance.mean(),
count=d.passenger_count.count())
passenger_count | avg_distance | count | |
---|---|---|---|
0 | 0 | 0.833625 | 5035 |
1 | 1 | 9.033823 | 121959711 |
2 | 2 | 6.992290 | 23517494 |
3 | 3 | 14.089989 | 7315829 |
4 | 4 | 5.286269 | 3582103 |
5 | 5 | 2.995215 | 10034696 |
6 | 6 | 2.956734 | 6764789 |
7 | 7 | 2.214286 | 35 |
8 | 8 | 3.360400 | 25 |
9 | 9 | 2.226538 | 26 |
10 | 129 | 0.920000 | 1 |
It's useful to note here all the things we didn't do.
pandas.read_csv
with the right arguments to make this fastBlaze handles these things for us. It drives Pandas intelligently, breaks up our computation into pieces it can perform in memory, and then shoves data through Pandas as fast as it can.
How long did it take to process 16 GB and do an out-of-core split-apply-combine operation?
expr = by(d.passenger_count, avg_distance=d.trip_distance.mean(),
count=d.passenger_count.count())
%time _ = compute(expr)
CPU times: user 2min 13s, sys: 13.6 s, total: 2min 26s Wall time: 3min 7s
Not great, but not terrible. This is about as fast as Postgres does it after the data has been loaded into Postgres.
Not very much, A few hundred megabytes by default.
Anecdotally I used to switch to a big machine when I ran out of memory. Now I switch to my big machine when I run out of disk space.
Blaze can use many cores to accelerate this work. It still uses pandas in each core but it now just splits apart the computation intelligently and directs different CSV files to different cores.
This gives a significant speedup even on my laptop. On a large workstation with more cores this speedup is more pronounced.
import multiprocessing
pool = multiprocessing.Pool(4)
%time _ = compute(expr, map=pool.map)
CPU times: user 133 ms, sys: 28.8 ms, total: 161 ms Wall time: 1min 1s
Lets say we didn't want to use Blaze but preferred to just use Pandas and some elbow grease.
How can we use Pandas in-memory to handle an out-of-memory dataset?
As an easier example, lets compute the mean of a single column on a single CSV file. We'll use Pandas' chunked CSV reader.
To compute an out-of-core mean we'll compute a running total and running count for each chunk
totals = []
counts = []
for chunk in pd.read_csv('trip_data_1.csv', chunksize=1000000, usecols=['passenger_count']):
totals.append(chunk.passenger_count.sum())
counts.append(chunk.passenger_count.count())
Then we perform a second computation on these intermediate results
1.0 * sum(totals) / sum(counts)
1.6973720977368634
This is exactly what Blaze does when we type the following.
Data('trip_data_1.csv').passenger_count.mean()
So to perform one computation, mean
, on an out of core dataset, we end up performing two different sets of computations
sum
and count
on each in-memory chunksum
and division
on the aggregated results from step #1This breakdown works on a surprisingly large class of operations. Split-apply-combine operations are handled similarly. We perform a different split-apply-combine operation on each chunk and then another on the aggregated results.
For more information on this see Blaze's out-of-core docs.
This doesn't work on cases like sort
or join
nor on any computation for which the intermediate results don't fit in memory. Of course, you can still sort
or join
computations, just so long as some data reducing step comes first.
OK, so lets go through and solve the entire out-of-core split-apply-combine problem on all of the CSV files.
Feel free to ignore this example. It's mostly here to show explicitly exactly what Blaze does for those who care and to generally impress those who don't.
Hold on to your butts.
%%time
from glob import glob
# Specifying active columns at parse time greatly improves performance
active_columns = ['passenger_count', 'trip_distance']
intermediates = []
# Do a split-apply-combine operation on each chunk of each CSV file
for fn in sorted(glob('trip_data_*.csv')):
for df in pd.read_csv(fn, usecols=active_columns,
chunksize=1000000, skipinitialspace=True):
chunk = df.groupby('passenger_count').agg({'passenger_count': ['count'],
'trip_distance': ['sum', 'count']})
intermediates.append(chunk)
# Bring those results together. These are much smaller and so likely fit in memory
df = pd.concat(intermediates, axis=0)
df.columns = ['trip_distance_sum', 'trip_distance_count', 'passenger_count_count'] # Flatten multi-index
# Perform second split-apply-combine operation on those intermediate results
groups = df.groupby(df.index) # group once for many of the following applies
df2 = pd.concat([groups.trip_distance_sum.sum(),
groups.trip_distance_count.sum(),
groups.passenger_count_count.sum()],
axis=1)
df2['avg_distance'] = df2.trip_distance_sum / df2.trip_distance_count
df2['count'] = df2.passenger_count_count
# Select out the columns we want
result = df2[['avg_distance', 'count']]
result
CPU times: user 2min, sys: 8.74 s, total: 2min 9s Wall time: 2min 49s
Blaze is a general library to bring expert data analysis into the hands of everyday users
The example above is a PITA to do by hand. More than that it has a number of tricks not known to many Pandas users.
Fortunately Blaze automates these tricks, making them routine for a broad class of problems. Moreso it does this from a relatively naive user-focused syntax.
d = Data('trip_data_*.csv')
by(d.passenger_count, avg_distance=d.trip_distance.mean(),
count=d.passenger_count.count())
Hopefully this example helps to explain how Blaze chunks apart computations on large CSV files to operate in memory.
This also highlights the relationship between Blaze and Pandas. Pandas is Blaze's preferred library when it performs in-memory analytics on tabular data. In these cases it's Blaze's job to arrange data well and call Pandas with the right arguments while it's Pandas' job to actually do the computation.
As a reminder, large CSV files are just one application of Blaze. Blaze provides a similar experience and set-of-tricks for SQL, Spark, and Binary storage files.
You can learn more about Blaze at http://blaze.pydata.org/