8

I have a directory of json files that I am trying to convert to a dask DataFrame and save it to castra. There are 200 files containing O(10**7) json records between them. The code is very simple largely following tutorial examples.

import dask.dataframe as dd
import dask.bag as db
import json
txt = db.from_filenames('part-*.json')
js = txt.map(json.loads)
df = js.to_dataframe()
cs=df.to_castra("data.castra")

I am running it on a 32 core machine, but the code only utilizes one core at 100%. My understanding from the docs is that this code execute in parallel. Why is it not? Did I misunderstand something?

Daniel Mahler
  • 7,653
  • 5
  • 51
  • 90

1 Answers1

8

Your final collection is a dask dataframe, which uses threads by default, you will have to explicitly tell dask to use processes.

You can do this globally

import dask
dask.config.set(scheduler='multiprocessing')

Or do this just on the to_castra call

df.to_castra("data.castra", scheduler='multiprocessing')

Also, just as a warning, Castra was mostly an experiment. It's decently fast but also not nearly mature as something like HDF5 or Parquet.

MRocklin
  • 55,641
  • 23
  • 163
  • 235
  • Thanks, but it did not work for me, `df.to_castra` does not take a `get` and the `dask.set_options` approach does not seem to have any effect. I understand that Castra is experimental, but it seems to fit many use cases for me very nicely. I often have medium size data that I want to be able use as a DataFrame with quick save and load times. Spark or HDFS is ovekill and Pandas doesn't quiet stretch that far. – Daniel Mahler Feb 19 '16 at 23:22
  • Ah, yes indeed you're correct. It looks like we've purposefully hardcoded the single-core scheduler into to_castra. I think this was because we ran into poor memory performance because intermediate results would pile up as things made data ready but the disk couldn't keep up. Still, this should be optional. I've hotfixed a quick fix in master at https://github.com/dask/dask/commit/cb4cc8127028fc736e295c114acfbeed15b71617 – MRocklin Feb 19 '16 at 23:38
  • Thanks! I have tried it. It does get the code running in parallel but it crashes at the end. There are a few other issues. I put the specifics in the notes on the CL. – Daniel Mahler Feb 20 '16 at 02:22
  • 1
    The syntax for newer versions of dask is now: `import dask.multiprocessing dask.config.set(scheduler=dask.multiprocessing.get)` – Lukas Feb 26 '19 at 14:03
  • What if it still uses only 1 cpu? – Soerendip Jan 16 '23 at 17:09