tl;dr: We discuss efficient techniques for on-disk storage of tabular data, notably the following:
We use NYCTaxi dataset for examples, and introduce a small project, Castra.
Interaction times can improve significantly (from minutes to seconds) if we choose to store our data on disk efficiently. This is particularly important for large data because we can no longer separately load in our data while we get a coffee and then iterate rapidly on our dataset once its comfortably in memory.
Larger-than-memory datasets force interactive workflows to include the hard drive.
CSV is great. Its human readable, accessible by every tool (even Excel!), and pretty simple.
CSV is also slow. The
pandas.read_csv parser maxes out at 100MB/s
on simple data. This doesnt include any keyword arguments like datetime
parsing that might slow it down further. Consider the time to parse a 24GB
24GB / (100MB/s) == 4 minutes
A four minute delay is too long for interactivity. We need to operate in seconds rather than minutes otherwise people leave to work on something else. This improvement from a few minutes to a few seconds is entirely possible if we choose better formats.
As an example lets play with the NYC Taxi dataset using dask.dataframe, a library that copies the Pandas API but operates in chunks off of disk.
>>> import dask.dataframe as dd >>> df = dd.read_csv('csv/trip_data_*.csv', ... skipinitialspace=True, ... parse_dates=['pickup_datetime', 'dropoff_datetime']) >>> df.head()medallion hack_license vendor_id rate_code store_and_fwd_flag pickup_datetime dropoff_datetime passenger_count trip_time_in_secs trip_distance pickup_longitude pickup_latitude dropoff_longitude dropoff_latitude 0 89D227B655E5C82AECF13C3F540D4CF4 BA96DE419E711691B9445D6A6307C170 CMT 1 N 2013-01-01 15:11:48 2013-01-01 15:18:10 4 382 1.0 -73.978165 40.757977 -73.989838 40.751171 1 0BD7C8F5BA12B88E0B67BED28BEA73D8 9FD8F69F0804BDB5549F40E9DA1BE472 CMT 1 N 2013-01-06 00:18:35 2013-01-06 00:22:54 1 259 1.5 -74.006683 40.731781 -73.994499 40.750660 2 0BD7C8F5BA12B88E0B67BED28BEA73D8 9FD8F69F0804BDB5549F40E9DA1BE472 CMT 1 N 2013-01-05 18:49:41 2013-01-05 18:54:23 1 282 1.1 -74.004707 40.737770 -74.009834 40.726002 3 DFD2202EE08F7A8DC9A57B02ACB81FE2 51EE87E3205C985EF8431D850C786310 CMT 1 N 2013-01-07 23:54:15 2013-01-07 23:58:20 2 244 0.7 -73.974602 40.759945 -73.984734 40.759388 4 DFD2202EE08F7A8DC9A57B02ACB81FE2 51EE87E3205C985EF8431D850C786310 CMT 1 N 2013-01-07 23:25:03 2013-01-07 23:34:24 1 560 2.1 -73.976250 40.748528 -74.002586 40.747868
It takes a second to load the first few lines but 11 to 12 minutes to roll through the entire dataset. We make a zoomable picture below of a random sample of the taxi pickup locations in New York City. This example is taken from a full example notebook here.
df2 = df[(df.pickup_latitude > 40) & (df.pickup_latitude < 42) & (df.pickup_longitude > -75) & (df.pickup_longitude < -72)] sample = df2.sample(frac=0.0001) pickup = sample[['pickup_latitude', 'pickup_longitude']] result = pickup.compute() from bokeh.plotting import figure, show, output_notebook p = figure(title="Pickup Locations") p.scatter(result.pickup_longitude, result.pickup_latitude, size=3, alpha=0.2)
This result takes eleven minutes to compute, almost all of which is parsing CSV files. While this may be acceptable for a single computation we invariably make mistakes and start over or find new avenues in our data to explore. Each step in our thought process now takes eleven minutes, ouch.
Interactive exploration of larger-than-memory datasets requires us to evolve beyond CSV files.
What efficient techniques exist for tabular data?
A good solution may have the following attributes:
We discuss each of these below.
Consider the text 1.23 as it is stored in a CSV file and how it is stored as a Python/C float in memory:
These look very different. When we load
1.23 from a CSV textfile
we need to translate it to
0x3f9d70a4; this takes time.
A binary format stores our data on disk exactly how it will look in memory; we
store the bytes
0x3f9d70a4 directly on disk so that when we load data
from disk to memory no extra translation is necessary. Our file is no longer
human readable but its much faster.
This gets more intense when we consider datetimes:
Every time we parse a datetime we need to compute how many microseconds it has
been since the epoch. This calculation needs to take into account things like
how many days in each month, and all of the intervening leap years. This is
slow. A binary representation would record the integer directly on disk (as
0x51e278694a680) so that we can load our datetimes directly into memory
Many analytic computations only require a few columns at a time, often only one, e.g.
>>> df.passenger_counts.value_counts().compute().sort_index() 0 3755 1 119605039 2 23097153 3 7187354 4 3519779 5 9852539 6 6628287 7 30 8 23 9 24 129 1 255 1 Name: passenger_count, dtype: int64
Of our 24 GB we may only need 2GB. Columnar storage means storing each column separately from the others so that we can read relevant columns without passing through irrelevant columns.
Our CSV example fails at this. While we only want two columns,
pickup_longitude, we pass through all of our data to
collect the relevant fields. The pickup location data is mixed with all the
Categoricals encode repetitive text columns (normally very expensive) as integers (very very cheap) in a way that is invisible to the user.
Consider the following (mostly text) columns of our NYC taxi dataset:
>>> df[['medallion', 'vendor_id', 'rate_code', 'store_and_fwd_flag']].head()medallion vendor_id rate_code store_and_fwd_flag 0 89D227B655E5C82AECF13C3F540D4CF4 CMT 1 N 1 0BD7C8F5BA12B88E0B67BED28BEA73D8 CMT 1 N 2 0BD7C8F5BA12B88E0B67BED28BEA73D8 CMT 1 N 3 DFD2202EE08F7A8DC9A57B02ACB81FE2 CMT 1 N 4 DFD2202EE08F7A8DC9A57B02ACB81FE2 CMT 1 N
Each of these columns represents elements of a small set:
And yet we store these elements in large and cumbersome dtypes:
In : df[['medallion', 'vendor_id', 'rate_code', 'store_and_fwd_flag']].dtypes Out: medallion object vendor_id object rate_code int64 store_and_fwd_flag object dtype: object
int64 for rate code, which could easily have fit into an
opportunity for an 8x improvement in memory use. The object dtype used for
strings in Pandas and Python takes up a lot of memory and is quite slow:
In : import sys In : sys.getsizeof('CMT') # bytes Out: 40
Categoricals replace the original column with a column of integers (of the
appropriate size, often
int8) along with a small index mapping those integers to the
original values. Ive written about categoricals
before so I
wont go into too much depth here. Categoricals increase both storage and
computational efficiency by about 10x if you have text data that describes
elements in a category.
After weve encoded everything well and separated our columns we find ourselves limited by disk I/O read speeds. Disk read bandwidths range from 100MB/s (laptop spinning disk hard drive) to 2GB/s (RAID of SSDs). This read speed strongly depends on how large our reads are. The bandwidths given above reflect large sequential reads such as you might find when reading all of a 100MB file in one go. Performance degrades for smaller reads. Fortunately, for analytic queries were often in the large sequential read case (hooray!)
We reduce disk read times through compression. Consider the datetimes of the NYC taxi dataset. These values are repetitive and slowly changing; a perfect match for modern compression techniques.
>>> ind = df.index.compute() # this is on presorted index data (see castra section below) >>> ind DatetimeIndex(['2013-01-01 00:00:00', '2013-01-01 00:00:00', '2013-01-01 00:00:00', '2013-01-01 00:00:00', '2013-01-01 00:00:00', '2013-01-01 00:00:00', '2013-01-01 00:00:00', '2013-01-01 00:00:00', '2013-01-01 00:00:00', '2013-01-01 00:00:00', ... '2013-12-31 23:59:42', '2013-12-31 23:59:47', '2013-12-31 23:59:48', '2013-12-31 23:59:49', '2013-12-31 23:59:50', '2013-12-31 23:59:51', '2013-12-31 23:59:54', '2013-12-31 23:59:55', '2013-12-31 23:59:57', '2013-12-31 23:59:57'], dtype='datetime64[ns]', name=u'pickup_datetime', length=169893985, freq=None, tz=None)
We can use a modern compression library, like
fastlz or blosc to compress this data at high speeds.
In : import blosc In : %time compressed = blosc.compress_ptr(address=ind.values.ctypes.data, ...: items=len(ind), ...: typesize=ind.values.dtype.alignment, ...: clevel=5) CPU times: user 3.22 s, sys: 332 ms, total: 3.55 s Wall time: 512 ms In : len(compressed) / ind.nbytes # compression ratio Out: 0.14296813539337488 In : ind.nbytes / 0.512 / 1e9 # Compresson bandwidth (GB/s) Out: 2.654593515625 In : %time _ = blosc.decompress(compressed) CPU times: user 1.3 s, sys: 438 ms, total: 1.74 s Wall time: 406 ms In : ind.nbytes / 0.406 / 1e9 # Decompression bandwidth (GB/s) Out: 3.3476647290640393
We store 7x fewer bytes on disk (thus septupling our effective disk I/O) by adding an extra 3GB/s delay. If were on a really nice Macbook pro hard drive (~600MB/s) then this is a clear and substantial win. The worse the hard drive, the better this is.
Some data is more or less compressable than others. The following column of floating point data does not compress as nicely.
In : x = df.pickup_latitude.compute().values In : %time compressed = blosc.compress_ptr(x.ctypes.data, len(x), x.dtype.alignment, clevel=5) CPU times: user 5.87 s, sys: 0 ns, total: 5.87 s Wall time: 925 ms In : len(compressed) / x.nbytes Out: 0.7518617315969132
This compresses more slowly and only provides marginal benefit. Compression may still be worth it on slow disk but this isnt a huge win.
The pickup_latitude column isnt compressible because most of the information isnt repetitive. The numbers to the far right of the decimal point are more or less random.
Other floating point columns may compress well, particularly when they are rounded to small and meaningful decimal values.
Optimal compression requires thought. General rules of thumb include the following:
Finally, avoid gzip and bz2. These are both very common and very slow. If dealing with text data, consider snappy (also available via blosc.)
One column usually dominates our queries. In time-series data this is time. For personal data this is the user ID.
Just as column stores let us avoid irrelevant columns, partitioning our data along a preferred index column lets us avoid irrelevant rows. We may need the data for the last month and dont need several years worth. We may need the information for Alice and dont need the information for Bob.
Traditional relational databases provide indexes on any number of columns or sets of columns. This is excellent if you are using a traditional relational database. Unfortunately the data structures to provide arbitrary indexes dont mix well with some of the attributes discussed above and were limited to a single index that partitions our data into sorted blocks.
Additionally newer single-machine data stores like Datos SFrame and BColz follow many of these principles. Finally many people have been doing this for a long time with custom use of libraries like HDF5.
It turns out that these principles are actually quite easy to implement with the right tools (thank you #PyData) The rest of this post will talk about a tiny 500 line project, Castra, that implements these princples and gets good speedups on biggish Pandas data.
With these goals in mind we built Castra, a binary partitioned compressed columnstore with builtin support for categoricals and integration with both Pandas and dask.dataframe.
Here we load in our data from CSV files, sort on the pickup datetime column, and store to a castra file. This takes about an hour (as compared to eleven minutes for a single read.) Again, you can view the full notebook here
>>> import dask.dataframe as dd >>> df = dd.read_csv('csv/trip_data_*.csv', ... skipinitialspace=True, ... parse_dates=['pickup_datetime', 'dropoff_datetime']) >>> (df.set_index('pickup_datetime', compute=False) ... .to_castra('trip.castra', categories=True))
Now we can take advantage of columnstores, compression, and binary representation to perform analytic queries quickly. Here is code to create a histogram of trip distance. The plot of the results follows below.
Note that this is especially fast because Pandas now releases the
groupby operations really). This takes around 20 seconds on
my machine on the last release of Pandas vs 5 seconds on the development
branch. Moving from CSV files to Castra moved the bottleneck of our
computation from disk I/O to processing speed, allowing improvements like
multi-core processing to really shine.
We plot the result of the above computation with Bokeh below. Note the spike around 20km. This is around the distance from Midtown Manhattan to LaGuardia airport.
Ive shown Castra used above with dask.dataframe but it works fine with straight Pandas too.
Castra was started by myself and Valentin Haenel (current maintainer of bloscpack and bcolz) during an evening sprint following PyData Berlin. Several bugfixes and refactors were followed up by Phil Cloud and Jim Crist.
Castra is roughly 500 lines long. Its a tiny project which is both good and bad. Its being used experimentally and there are some heavy disclaimers in the README. This post is not intended as a sales pitch for Castra, but rather to provide a vocabulary to talk about efficient tabular storage.
Response to twitter traffic: again, this blogpost is not saying use Castra! Rather it says dont use CSVs! and consider more efficient storage for interactive use. Other, more mature solutions exist or could be built. Castra was an experiment of how fast can we get without doing too much work.