0

How to accomplish such a groupby-size task on a resource-limited machine?

My code looks like this:

import dask.dataframe as dd

ddf = dd.read_parquet(parquet_path)
sr = ddf.groupby(["col_1", "col_2"]).size()
sr.to_csv(csv_path)

My data:

  • The parquet file is around 7GB and 300M records in total. Furthermore, it is expected to be 3 times bigger after more data is appended.
  • The parquet file consists of 30 parts with each part being around 235MB. The parts are output by batch using .to_parquet(append=True) on the same machine, so I didn't run into memory issues when generating the data.
  • Both col_1 and col_2 have data type uint64.

The code worked correctly with a small sample but failed on a large sample. I don't know what options do I have for accomplishing such a task with an ordinary Win10 laptop having only 12GB memory installed.

Bill Huang
  • 4,491
  • 2
  • 13
  • 31

1 Answers1

1

The number of unique combinations of col1 and col2 should fit into memory, ideally it should be a small fraction of the available worker memory. If that is true for your data, you could try specifying split_every option (see docs):

sr = ddf.groupby(["col_1", "col_2"]).size(split_every=2)

On a local machine, check that each worker has enough memory, with 12 GB memory, I would probably restrict it to 2 workers at most.

Also, you might find this answer to a related question helpful.

SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46
  • Thanks for the prompt reply. I will give precise feedback after experimenting. So, this answer is basically all about lowering the number of workers and `split_every`. Is it likely to work with a bigger dataset (>20GB and >1G records)? Besides, I couldn't find the official doc of `split_every` option. Where is it? – Bill Huang Aug 09 '21 at 13:08
  • The issue is not so much the size of the data, as the number of unique combinations of col1 and col2. Imagine at the extreme that every row of the dataset is a unique combination of col1 and col2, in that case you would need memory that would fit the whole dataset... – SultanOrazbayev Aug 09 '21 at 13:12
  • OK, your advice of reducing #workers worked. I requested access to a more powerful server and used `cluster = LocalCluster(n_workers=1, threads_per_worker=1, memory_limit='24GiB')`. The memory usage peaked at 17G as shown in the Dask dashboard (port 8787). So I guess I do need that much memory anyway. I don't know whether it could be more optimized. – Bill Huang Aug 11 '21 at 06:26