I would probably address this by using dask to load your data in a streaming fashion. For example, you can create a dask dataframe as follows:
import dask.dataframe as ddf
data = ddf.read_csv('test.csv')
This data
object hasn't actually done anything at this point; it just contains a "recipe" of sorts to read the dataframe from disk in manageable chunks. If you want to materialize the data, you can call compute()
:
df = data.compute().reset_index(drop=True)
At this point, you have a standard pandas dataframe (we call reset_index
because by default each partition is independently indexed). The result is equivalent to what you get by calling pd.read_csv
directly:
df.equals(pd.read_csv('test.csv'))
# True
The benefit of dask is you can add instructions to this "recipe" for constructing your dataframe; for example, you could make each partition of the data sparse as follows:
data = data.map_partitions(lambda part: part.to_sparse(fill_value=0))
At this point, calling compute()
will construct a sparse array:
df = data.compute().reset_index(drop=True)
type(df)
# pandas.core.sparse.frame.SparseDataFrame
Profiling
To check how the dask approach compares to the raw pandas approach, let's do some line profiling. I'll use lprun
and mprun
, as described here (full disclosure: that's a section of my own book).
Assuming you're working in the Jupyter notebook, you can run it this way:
First, create a separate file with the basic tasks we want to do:
%%file dask_load.py
import numpy as np
import pandas as pd
import dask.dataframe as ddf
def compare_loads():
df = pd.read_csv('test.csv')
df_sparse = df.to_sparse(fill_value=0)
df_dask = ddf.read_csv('test.csv', blocksize=10E6)
df_dask = df_dask.map_partitions(lambda part: part.to_sparse(fill_value=0))
df_dask = df_dask.compute().reset_index(drop=True)
Next let's do line-by-line profiling for computation time:
%load_ext line_profiler
from dask_load import compare_loads
%lprun -f compare_loads compare_loads()
I get the following result:
Timer unit: 1e-06 s
Total time: 13.9061 s
File: /Users/jakevdp/dask_load.py
Function: compare_loads at line 6
Line # Hits Time Per Hit % Time Line Contents
==============================================================
6 def compare_loads():
7 1 4746788 4746788.0 34.1 df = pd.read_csv('test.csv')
8 1 769303 769303.0 5.5 df_sparse = df.to_sparse(fill_value=0)
9
10 1 33992 33992.0 0.2 df_dask = ddf.read_csv('test.csv', blocksize=10E6)
11 1 7848 7848.0 0.1 df_dask = df_dask.map_partitions(lambda part: part.to_sparse(fill_value=0))
12 1 8348217 8348217.0 60.0 df_dask = df_dask.compute().reset_index(drop=True)
We see that about 60% of the time is spent in the dask call, while about 40% of the time is spent in the pandas call for the example array above. This tells us that dask is about 50% slower than pandas for this task: this is to be expected, because the chunking and recombining of data partitions leads to some extra overhead.
Where dask shines is in memory usage: let's use mprun
to do a line-by-line memory profile:
%load_ext memory_profiler
%mprun -f compare_loads compare_loads()
The result on my machine is this:
Filename: /Users/jakevdp/dask_load.py
Line # Mem usage Increment Line Contents
================================================
6 70.9 MiB 70.9 MiB def compare_loads():
7 691.5 MiB 620.6 MiB df = pd.read_csv('test.csv')
8 828.8 MiB 137.3 MiB df_sparse = df.to_sparse(fill_value=0)
9
10 806.3 MiB -22.5 MiB df_dask = ddf.read_csv('test.csv', blocksize=10E6)
11 806.4 MiB 0.1 MiB df_dask = df_dask.map_partitions(lambda part: part.to_sparse(fill_value=0))
12 947.9 MiB 141.5 MiB df_dask = df_dask.compute().reset_index(drop=True)
We see that the final pandas dataframe size is about ~140MB, but pandas uses ~620MB along the way as it reads the data into a temporary dense object.
On the other hand, dask only uses ~140MB total in loading the array and constructing the final sparse result. In the case that you are reading data whose dense size is comparable to the memory available on your system, dask has a clear advantage, despite the ~50% slower computational time.
But for working with large data, you should not stop here. Presumably you're doing some operations on your data, and the dask dataframe abstraction allows you to do those operations (i.e. add them to the "recipe") before ever materializing the data. So if what you're doing with the data involves arithmetic, aggregations, grouping, etc. you don't even need to worry about the sparse storage: just do those operations with the dask object, call compute()
at the end, and dask will take care of applying them in a memory efficient way.
So, for example, I could compute the max()
of each column using the dask dataframe, without ever having to load the whole thing into memory at once:
>>> data.max().compute()
x 5.38114
y 5.33796
z 5.25661
txt j
dtype: object
Working with dask dataframes directly will allow you to circumvent worries about data representation, because you'll likely never have to load all the data into memory at once.
Best of luck!