32

The documentation for Dask talks about repartioning to reduce overhead here.

They however seem to indicate you need some knowledge of what your dataframe will look like beforehand (ie that there will 1/100th of the data expected).

Is there a good way to repartition sensibly without making assumptions? At the moment I just repartition with npartitions = ncores * magic_number, and set force to True to expand partitions if need be. This one size fits all approach works but is definitely suboptimal as my dataset varies in size.

The data is time series data, but unfortunately not at regular intervals, I've used repartition by time frequency in the past but this would be suboptimal because of how irregular the data is (sometimes nothing for minutes then thousands in seconds)

Samantha Hughes
  • 593
  • 1
  • 6
  • 13
  • This may be a terrible idea - but would calling `len()` on `df[df.name == 'Alice']` be too expensive? I feel like it shouldn't be - each worker sums up their dataframe length and the scheduler reduces that to a sum. Then, once you have that number, you can create a ratio of the original height to the current, and update the partition count via repartition accordingly. – kuanb Jun 20 '17 at 16:39
  • 1
    I think calling len() performs the entire task graph to that point, which would be extremely expensive. – Samantha Hughes Jun 20 '17 at 16:50
  • What I am trying currently is to compute() the dataframe to pandas after filtering. Then immediately stuff it back into a dask dataframe using the chunksize parameter on .from_pandas set to what I think is appropriate. This only works on dataframes that can fit in memory but does save expensive recomputes you'd get with len() – Samantha Hughes Jun 20 '17 at 16:52
  • Would not using something like `count()` on you subset query get you the length without needing load it into memory as a Pandas DataFrame and then resend it back to Dask? Seems like that operation itself is intense? – kuanb Jun 20 '17 at 17:12
  • Would count() not trigger the entire dask task graph before it? – Samantha Hughes Jun 20 '17 at 18:17
  • It would trigger the generation of that value - but would be far less intensive than your `compute()` operation which reassembles the entire dataframe. From what I've seen, getting sum, min, mean, etc. values from a dataframe of significant size isn't too costly when distributed - it's the reassembly of the dataframe and the returning of it as a single object that is. Also just realized that you are not mentioning that you are using distributed so I don't know off hand what Dask does on its own without Distirbuted, but would assume it is roughly similar? – kuanb Jun 20 '17 at 20:04
  • I am using distributed sorry I did not mention. – Samantha Hughes Jun 20 '17 at 21:23
  • 5
    There is no automatic way to repartition sensibly, although there probably should be. I might aim to have each dataframe be about 100MB in size. You could call `df.memory_usage().sum().compute()` to help determine a good number of partitions. – MRocklin Jun 20 '17 at 22:28

4 Answers4

31

As of Dask 2.0.0 you may call .repartition(partition_size="100MB").

This method performs an object-considerate (.memory_usage(deep=True)) breakdown of partition size. It will join smaller partitions, or split partitions that have grown too large.

Dask's Documentation also outlines the usage.

Trenton McKinney
  • 56,955
  • 33
  • 144
  • 158
Wes Roach
  • 406
  • 4
  • 4
10

After discussion with mrocklin a decent strategy for partitioning is to aim for 100MB partition sizes guided by df.memory_usage().sum().compute(). With datasets that fit in RAM the additional work this might involve can be mitigated with use of df.persist() placed at relevant points.

Samantha Hughes
  • 593
  • 1
  • 6
  • 13
10

Just to add to Samantha Hughes' answer:

memory_usage() by default ignores memory consumption of object dtype columns. For the datasets I have been working with recently this leads to an underestimate of memory usage of about 10x.

Unless you are sure there are no object dtype columns I would suggest specifying deep=True, that is, repartition using:

df.repartition(npartitions= 1+df.memory_usage(deep=True).sum().compute() // n )

Where n is your target partition size in bytes. Adding 1 ensures the number of partitions is always greater than 1 (// performs floor division).

BHC
  • 889
  • 1
  • 7
  • 18
0

I tried to check what is the optimal number for my case. I have 100Gb csv files with 250M rows and 25 columns. I work on laptop with 8 cores . I run the function "describe" on 1,5,30,1000 partitions

df = df.repartition(npartitions=1)
a1=df['age'].describe().compute()
df = df.repartition(npartitions=5)
a2=df['age'].describe().compute()
df = df.repartition(npartitions=30)
a3=df['age'].describe().compute()
df = df.repartition(npartitions=100)
a4=df['age'].describe().compute()

about speed :

5,30 > around 3 minutes

1, 1000 > around 9 minutes

but ...I found that "order" functions like median or percentile give wrong number when I used more than one partition .

1 partition give right number (I checked it with small data using pandas and dask)

rafine
  • 361
  • 3
  • 18