1

I've got a Pandas DataFrame of around ~60 million rows. The first 60 rows correspond to the first group, and so on. Each of these groups needs to be processed in parallel, and each of them returns a NumPy array larger than 4GB. I've got enough RAM and cores to process around 100 of these groups in parallel. In the end, I have to add the results of each of the groups to get my final result, so the final result is no larger than the result of each group.

The processing is roughly: binning the data of a group on a grid (NumPy array) of a given size and computing the tensor outer product of this grid with itself (numpy.multiply.outer). Then summing the resulting products to get my final array.

If I process the groups sequentially, the processing can take days. So, I require to process them in parallel.

First, I tried multiprocessing. As the number of groups was large, I split the dataframe in chunks of 10,000 groups each. A function would receive a chunk, process each group in the chunk and return the sum of these results (a partial sum of my final result). To get my final result, I would only have to sum the result of each process. I could see the processes running in parallel, but when returning the results I got the error 'OverflowError('cannot serialize a bytes objects larger than 4GiB',)'. I tried this solution: python multiprocessing - OverflowError('cannot serialize a bytes object larger than 4GiB') which led me to the second error mentioned in the answer. However, I can't redefine my functions as void because that would require me to store the data in files and I don't have enough disk space to store all the data. Also, the processes would compete for writing to disk which would generate a bottleneck; and then reading the files, reconstructing the arrays and adding the results would also take a long time.

Then I tried Dask. First, I partitioned the Dask dataframe in groups of 60, created a function that would process each of the groups, and called it using delayed.

from dask import delayed
results = []
for partition in ddf.partitions:
    result = delayed(func)(partition)
    results.append(result)

delayed(sum)(results).compute()

However, most of the processes would be sleeping most of the time and I couldn't see much parallelism. Apparently, Dask doesn't deal well with large task graphs.

In order to avoid having a large task graph, I replaced the function with one that would take a large dataframe (containing many groups) and process each of the groups of that dataframe inside the function (similar to the multiprocessing approach).

import numpy as np, pandas as pd
def func(df):
    group_len = 60
    ngroups = int(len(df) / group_len) # len(df) is always a multiple of group_len
    sum_array = np.zeros(output_expected_shape) # Here the shape can be up to 6-dimensional
    for group in range(ngroups):
        # Do the processing...
        sum_array += group_result
    return sum_array

I partitioned the dataframe so each partition would have 600,000 rows (100 groups) and called it using the delayed approach. However, once more, most of the processes would be sleeping most of the time, and I couldn't observe real parallelism. I also noticed using the GUI that the workers were storing lots of data, even though there was plenty of RAM available. I tried having 6,000,000 rows per partition, but that didn't work either (also leaving all those cores unused doesn't seem like the optimal solution).

Then, I tried Dask map_partitions. The issue with it is that it doesn't work well when you need each function to generate a single NumPy array. You can see what the problem is in this question: How to return one NumPy array per partition in Dask?. But in summary, it returns a single array with the partial results stacked vertically. In order to get the real result, I would have to slice the array, take each of the elements that correspond to a partial result and add them. But 1) that destroys the point of using parallelism, and 2) the returned array may be too large as it contains many different results and it may not fit in memory.

Apparently, the map_partitions approach would work well if a chunksize was defined but chunk sizes are defined by amount of data ('16MB', '3GB') instead of by number of rows (which is what I need as the groups correspond to a particular number of them).

I would like to be able to process the groups in parallel efficiently. Up to this point, the sequential solution is still my best choice (the only one that eventually gets me to the result as Dask workers start raising TCP timeout errors after a while), but it is way too slow and leaves plenty of resources unused.

6659081
  • 381
  • 7
  • 21

1 Answers1

0

The problem with using delayed(sum)(results).compute() is that you are requesting all results to be passed to sum at once. This is not a problem for a small list of results, but when your results list exceeds the combined memory capacity of workers, your pipeline will break.

The simplest way to fix this is to implement this is using existing collections, using for example the advice here. (you mention the problem of row grouping, but this was resolved here)

Another approach at reducing memory usage is to aggressively aggregate all the partitions/chunks/arrays using something like a tree summation, see docs:

L = zs
while len(L) > 1:
    new_L = []
    for i in range(0, len(L), 2):
        lazy = add(L[i], L[i + 1])  # add neighbors
        new_L.append(lazy)
    L = new_L                       # swap old list for new

dask.compute(L)
SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46
  • Hi Sultan. Yes, it was thanks to the help with the grouping that I could use `map_partitions`. The problem with collections is that I get a very nested list of delayed operations, like this `[[[[[[Delayed(('func-df4a93', 0, 0, 0))]]]]], [[[[[Delayed(('func-df4a93', 1, 0, 0))]]]]], ...]]]]]]`. Even though, I don't know what's causing it, I tried flattening the list recursively. However, I still get `KeyError: ('func-5887b6', 5, 0, 0)`, but the key is present in my list. I don't know whether it has to do with the double parentheses in the delayed operations (you can see that in the list). – 6659081 Mar 12 '21 at 09:50
  • Thinking again about it, it shouldn't be the double parentheses as the key includes it. I haven't been able to find much info on this error. – 6659081 Mar 12 '21 at 10:07
  • Hmm, it does look like unpacking of lists is needed at some step in your pipeline. Do you use `*list_of_delayed` like mdurant suggested in the first link? – SultanOrazbayev Mar 12 '21 at 10:08
  • Yes, I'm unpacking the list in the call to compute. Also, after flattening the list (which is probably not the best approach, but I wanted to see what happened), I end up with [Delayed(('func-a0effc', 0, 0, 0)), Delayed(('func-a0effc', 1, 0, 0)), ... ]`. I'll try to see if there's some unpacking needed to be done somewhere else. But if I run my previous code on a small grid (meaning that the returned arrays are small) and use few groups (meaning that the task graph is small), I get the correct result. – 6659081 Mar 12 '21 at 10:22
  • I just noticed that, interestingly my output should be a 6-dimensional NumPy array, and my list of delayed operations is also 6-dimensional. I'll try returning each array in a list of one element or something similar. – 6659081 Mar 12 '21 at 10:30
  • That definitely didn't work. It only complicated getting the sum of the resulting arrays. – 6659081 Mar 12 '21 at 10:58
  • Hmmm, so the tuples inside the delayed are not a problem, it just indicates the ordering of partitions/chunks (?). I updated the answer with another possible solution. – SultanOrazbayev Mar 12 '21 at 11:09