15

I have recently begun looking at Dask for big data. I have a question on efficiently applying operations in parallel.

Say I have some sales data like this:

customerKey    productKey    transactionKey    grossSales  netSales      unitVolume    volume transactionDate
-----------  --------------  ----------------  ----------  --------      ----------    ------ --------------------
    20353           189            219548     0.921058     0.921058              1         1  2017-02-01 00:00:00
  2596618           189            215015     0.709997     0.709997              1         1  2017-02-01 00:00:00
 30339435           189            215184     0.918068     0.918068              1         1  2017-02-01 00:00:00
 32714675           189            216656     0.751007     0.751007              1         1  2017-02-01 00:00:00
 39232537           189            218180     0.752392     0.752392              1         1  2017-02-01 00:00:00
 41722826           189            216806     0.0160143    0.0160143             1         1  2017-02-01 00:00:00
 46525123           189            219875     0.469437     0.469437              1         1  2017-02-01 00:00:00
 51024667           189            215457     0.244886     0.244886              1         1  2017-02-01 00:00:00
 52949803           189            215413     0.837739     0.837739              1         1  2017-02-01 00:00:00
 56526281           189            220261     0.464716     0.464716              1         1  2017-02-01 00:00:00
 56776211           189            220017     0.272027     0.272027              1         1  2017-02-01 00:00:00
 58198475           189            215058     0.805758     0.805758              1         1  2017-02-01 00:00:00
 63523098           189            214821     0.479798     0.479798              1         1  2017-02-01 00:00:00
 65987889           189            217484     0.122769     0.122769              1         1  2017-02-01 00:00:00
 74607556           189            220286     0.564133     0.564133              1         1  2017-02-01 00:00:00
 75533379           189            217880     0.164387     0.164387              1         1  2017-02-01 00:00:00
 85676779           189            215150     0.0180961    0.0180961             1         1  2017-02-01 00:00:00
 88072944           189            219071     0.492753     0.492753              1         1  2017-02-01 00:00:00
 90233554           189            216118     0.439582     0.439582              1         1  2017-02-01 00:00:00
 91949008           189            220178     0.1893       0.1893                1         1  2017-02-01 00:00:00
 91995925           189            215159     0.566552     0.566552              1         1  2017-02-01 00:00:00

I want to do a few different groupbys, first a groupby-apply on customerKey. Then another groupby-sum on customerKey, and a column which will be the result of the previos groupby apply.

The most efficient way I can think of doing this would be do split this dataframe into partitions of chunks of customer keys. So, for example I could split the dataframe into 4 chunks with a partition scheme for example like (pseudocode)

partition by customerKey % 4

Then i could use map_partitions to do these group by applies for each partition, then finally returning the result. However it seems dask forces me to do a shuffle for each groupby I want to do.

Is there no way to repartition based on the value of a column?

At the moment this takes ~45s with 4 workers on a dataframe of only ~80,000 rows. I am planning to scale this up to a dataframe of trillions of rows, and already this seems like it is going to scale horribly.

Am I missing something fundamental to Dask?

karel
  • 5,489
  • 46
  • 45
  • 50
Roger Thomas
  • 822
  • 1
  • 7
  • 17

2 Answers2

10

You can set your column to be the index

df = df.set_index('customerKey')

This will sort your data by that column and track which ranges of values are in which partition. As you note this is likely to be an expensive operation, you you'll probably want to save it somewhere

Either in memory

df = df.persist()

or on disk

df.to_parquet('...')
df = df.read_parquet('...')
MRocklin
  • 55,641
  • 23
  • 163
  • 235
  • 4
    Aha, so if you set the index to customerKey you're guaranteed to have isolated chunks of customerKeys in each partition. That's cool, thanks for that. Is setting the index to a column and then doing df = df.map_partitions(f).compute() where f is quite a large function a standard practice when working with dask dataframe. It seems to be the most efficient solution for what i'm trying to do but isn't really mentioned much in the docs. – Roger Thomas Mar 28 '18 at 12:12
  • 1
    Correct, see http://dask.pydata.org/en/latest/dataframe-design.html#partitions for more information – MRocklin Mar 28 '18 at 12:13
  • Is setting the index to a column and then doing df = df.map_partitions(f).compute() where f is quite a large function a standard practice when working with dask dataframe. It seems to be the most efficient solution for what i'm trying to do but isn't really mentioned much in the docs – Roger Thomas Mar 28 '18 at 12:57
  • 1
    Somewhat common. You might also do groupby-apply, but given the way you phrased your problem above I suspect that the set_index/map_partitions solution will be more natural to you. – MRocklin Mar 28 '18 at 13:06
  • 6
    I've tried both methods, groupby-apply takes about ten times longer than map_partitions – Roger Thomas Mar 28 '18 at 15:21
  • @MRocklin if planning to split the dataframe after setting the index to the group column, would the implicit grouping still work? I suspect this would be hard since when splitting you specify the splitting ratio; I guess if grouping is honoured the ratio wouldn't always be correct. Right? – LetsPlayYahtzee Nov 19 '18 at 14:18
  • This would result in a duplicate index, is it not a problem? – Itamar Mushkin Dec 08 '21 at 13:15
-1

Setting index to the required column and map_partitions works much efficient compared to groupby

43shahin
  • 7
  • 1
  • 6
    While this may help OP, it's better to add more details, examples, etc. Please [provide answers that don't require clarification from the asker.](https://meta.stackexchange.com/questions/214173/why-do-i-need-50-reputation-to-comment-what-can-i-do-instead) – Til Mar 14 '19 at 03:56
  • 1
    @43shahin, I'd like more details as well. – scottlittle May 08 '19 at 15:31