2

I have a parquet file of position data for vehicles that is indexed by vehicle ID and sorted by timestamp. I want to read the parquet file, do some calculations on each partition (not aggregations) and then write the output directly to a new parquet file of similar size.

I organized my data and wrote my code (below) to use Dask's map_partitions, as I understood this would perform the operations one partition at a time, saving each result to disk sequentially and thereby minimizing memory usage. I was surprised to find that this was exceeding my available memory and I found that if I instead create a loop that runs my code on a single partition at a time and appends the output to the new parquet file (see second code block below), it easily fits within memory.

Is there something incorrect in the original way I used map_partitions? If not, why does it use so much more memory? What is the proper, most efficient way of achieving what I want?

Thanks in advance for any insight!!

Original (memory hungry) code:

ddf = dd.read_parquet(input_file)
meta_dict = ddf.dtypes.to_dict()

(
    ddf
    .map_partitions(my_function, meta = meta_dict)
    .to_parquet(
        output_file,
        append = False,
        overwrite = True,
        engine = 'fastparquet'
    )
)

Awkward looped (but more memory friendly) code:

ddf = dd.read_parquet(input_file)

for partition in range(0, ddf.npartitions, 1):
    partition_df = ddf.partitions[partition]
    (
        my_function(partition_df)
        .to_parquet(
            output_file,
            append = True,
            overwrite = False,
            engine = 'fastparquet'
        )
    )

More hardware and data details: The total input parquet file is around 5GB and is split into 11 partitions of up to 900MB. It is indexed by ID with divisions so I can do vehicle grouped operations without working across partitions. The laptop I'm using has 16GB RAM and 19GB swap. The original code uses all of both, while the looped version fits within RAM.

apeters
  • 21
  • 4
  • 1
    by default dask will spin up a number of workers equal to the number of available cores on your machine, and then will use all of those workers for the job. – Michael Delgado Jun 20 '22 at 23:54
  • I wondered about that, so I also tried limiting it to one worker with `cluster = LocalCluster(n_workers = 1); client = Client(cluster)` but that didn't help at all. – apeters Jun 21 '22 at 00:15
  • I think dask generally assumes partitions fit comfortably into memory, so it could be retaining information. someone else asked a similar question earlier today - maybe some of these tips would be helpful! https://stackoverflow.com/questions/72690155/dask-dataframe-concatenate-and-repartitions-large-files-for-time-series-and-corr – Michael Delgado Jun 21 '22 at 01:31
  • @MichaelDelgado You were right about the issue being Dask spinning up a bunch of cores. I had to limit the number of threads per worker as well with ```threads_per_worker=1```, and then the map_partitions code fit in memory. Thanks! – apeters Jun 21 '22 at 01:47
  • ahh - great glad you figured it out! – Michael Delgado Jun 21 '22 at 02:08
  • Please answer your own question with that solution, and accept the answer. Note that you are contrasting dask's out-of-core and parallelism features here, which are (or can be) somewhat at odds. – mdurant Jun 21 '22 at 13:40

1 Answers1

0

As @MichaelDelgado pointed out, by default Dask will spin up multiple workers/threads according to what is available on the machine. With the size of the partitions I have, this maxes out the available memory when using the map_partitions approach. In order to avoid this, I limited the number of workers and the number of threads per worker to prevent automatic parellelization using the code below, and the task fit in memory.

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(
    n_workers = 1,
    threads_per_worker = 1)
client = Client(cluster)
apeters
  • 21
  • 4