32

I'm converting a large textfile to a hdf storage in hopes of a faster data access. The conversion works allright, however reading from the csv file is not done in parallel. It is really slow (takes about 30min for a 1GB textfile on an SSD, so my guess is that it is not IO-bound).

Is there a way to have it read in multiple threads in parralel? Sice it might be important, I'm currently forced to run under Windows -- just in case that makes any difference.

from dask import dataframe as ddf
df = ddf.read_csv("data/Measurements*.csv",
             sep=';', 
             parse_dates=["DATETIME"], 
             blocksize=1000000,
             )

df.categorize([ 'Type',
                'Condition',               
          ])

df.to_hdf("data/data.hdf", "Measurements", 'w')
Magellan88
  • 2,543
  • 3
  • 24
  • 36
  • 2
    We have a similar problem in a non dask app - it is relativly easy to create multiple chunks from the csv file and read them in parallel. Remeber that each line is a valid record. – Christian Sauer Oct 18 '16 at 05:30

2 Answers2

28

Yes, dask.dataframe can read in parallel. However you're running into two problems:

Pandas.read_csv only partially releases the GIL

By default dask.dataframe parallelizes with threads because most of Pandas can run in parallel in multiple threads (releases the GIL). Pandas.read_csv is an exception, especially if your resulting dataframes use object dtypes for text

dask.dataframe.to_hdf(filename) forces sequential computation

Writing to a single HDF file will force sequential computation (it's very hard to write to a single file in parallel.)

Edit: New solution

Today I would avoid HDF and use Parquet instead. I would probably use the multiprocessing or dask.distributed schedulers to avoid GIL issues on a single machine. The combination of these two should give you full linear scaling.

from dask.distributed import Client
client = Client()

df = dask.dataframe.read_csv(...)
df.to_parquet(...)

Solution

Because your dataset likely fits in memory, use dask.dataframe.read_csv to load in parallel with multiple processes, then switch immediately to Pandas.

import dask.dataframe as ddf
import dask.multiprocessing

df = ddf.read_csv("data/Measurements*.csv",  # read in parallel
             sep=';', 
             parse_dates=["DATETIME"], 
             blocksize=1000000,
             )

df = df.compute(get=dask.multiprocessing.get)     # convert to pandas

df['Type'] = df['Type'].astype('category')
df['Condition'] = df['Condition'].astype('category')

df.to_hdf('data/data.hdf', 'Measurements', format='table', mode='w')
kjacks21
  • 123
  • 4
MRocklin
  • 55,641
  • 23
  • 163
  • 235
  • I hope my expectation of later reading from hdf5 files to gain over text-based csv files will be met. Thanks very much I'm really excited about dask. – Magellan88 Oct 18 '16 at 14:37
  • 7
    Is it possible for you to extend this answer in two ways? First, my .csv does not fit into the memory. Second and probably more complicated, the .csv file is zipped which is currently not supported by Dask.There are pointers in this discussion (https://github.com/dask/dask/issues/2554) to `dask.delayed` but I am not sure how to use it in conjunction with `pd.read_csv` and `chunksize`. Thanks! – tobiasraabe Jun 06 '18 at 10:10
15

Piggybacking off of @MRocklin's answer, in newer versions of dask, you can use df.compute(scheduler='processes') or df.compute(scheduler='threads') to convert to pandas using multiprocessing or multithreading:

from dask import dataframe as ddf
df = ddf.read_csv("data/Measurements*.csv",
             sep=';', 
             parse_dates=["DATETIME"], 
             blocksize=1000000,
             )

df = df.compute(scheduler='processes')     # convert to pandas

df['Type'] = df['Type'].astype('category')
df['Condition'] = df['Condition'].astype('category')

df.to_hdf('data/data.hdf', 'Measurements', format='table', mode='w')
mgoldwasser
  • 14,558
  • 15
  • 79
  • 103
  • Hi @mgoldwasser, nice answer. What would be the difference between those 2 options - `processes` vs `threads` (in terms of running on a single machine)? – edesz May 24 '19 at 19:37
  • 3
    Hi @edesz - threads share memory, and are subject to the GIL (Global Interpreter Lock), while processes run as separate processes and have that additional overhead. Generally, multi-threading does not work well in Python due to limitations of the GIL, unless the task is IO bound (for example, if each task is downloading a file). If you aren't sure, try them both and see which is faster. – mgoldwasser May 28 '19 at 14:33