I've got a Pandas DataFrame of around ~60 million rows. The first 60 rows correspond to the first group, and so on. Each of these groups needs to be processed in parallel, and each of them returns a NumPy array larger than 4GB. I've got enough RAM and cores to process around 100 of these groups in parallel. In the end, I have to add the results of each of the groups to get my final result, so the final result is no larger than the result of each group.
The processing is roughly: binning the data of a group on a grid (NumPy array) of a given size and computing the tensor outer product of this grid with itself (numpy.multiply.outer
). Then summing the resulting products to get my final array.
If I process the groups sequentially, the processing can take days. So, I require to process them in parallel.
First, I tried multiprocessing
. As the number of groups was large, I split the dataframe in chunks of 10,000 groups each. A function would receive a chunk, process each group in the chunk and return the sum of these results (a partial sum of my final result). To get my final result, I would only have to sum the result of each process. I could see the processes running in parallel, but when returning the results I got the error 'OverflowError('cannot serialize a bytes objects larger than 4GiB',)'
. I tried this solution: python multiprocessing - OverflowError('cannot serialize a bytes object larger than 4GiB') which led me to the second error mentioned in the answer. However, I can't redefine my functions as void because that would require me to store the data in files and I don't have enough disk space to store all the data. Also, the processes would compete for writing to disk which would generate a bottleneck; and then reading the files, reconstructing the arrays and adding the results would also take a long time.
Then I tried Dask. First, I partitioned the Dask dataframe in groups of 60, created a function that would process each of the groups, and called it using delayed
.
from dask import delayed
results = []
for partition in ddf.partitions:
result = delayed(func)(partition)
results.append(result)
delayed(sum)(results).compute()
However, most of the processes would be sleeping most of the time and I couldn't see much parallelism. Apparently, Dask doesn't deal well with large task graphs.
In order to avoid having a large task graph, I replaced the function with one that would take a large dataframe (containing many groups) and process each of the groups of that dataframe inside the function (similar to the multiprocessing
approach).
import numpy as np, pandas as pd
def func(df):
group_len = 60
ngroups = int(len(df) / group_len) # len(df) is always a multiple of group_len
sum_array = np.zeros(output_expected_shape) # Here the shape can be up to 6-dimensional
for group in range(ngroups):
# Do the processing...
sum_array += group_result
return sum_array
I partitioned the dataframe so each partition would have 600,000 rows (100 groups) and called it using the delayed
approach. However, once more, most of the processes would be sleeping most of the time, and I couldn't observe real parallelism. I also noticed using the GUI
that the workers were storing lots of data, even though there was plenty of RAM available. I tried having 6,000,000 rows per partition, but that didn't work either (also leaving all those cores unused doesn't seem like the optimal solution).
Then, I tried Dask map_partitions
. The issue with it is that it doesn't work well when you need each function to generate a single NumPy array. You can see what the problem is in this question: How to return one NumPy array per partition in Dask?. But in summary, it returns a single array with the partial results stacked vertically. In order to get the real result, I would have to slice the array, take each of the elements that correspond to a partial result and add them. But 1) that destroys the point of using parallelism, and 2) the returned array may be too large as it contains many different results and it may not fit in memory.
Apparently, the map_partitions
approach would work well if a chunksize
was defined but chunk sizes are defined by amount of data ('16MB', '3GB') instead of by number of rows (which is what I need as the groups correspond to a particular number of them).
I would like to be able to process the groups in parallel efficiently. Up to this point, the sequential solution is still my best choice (the only one that eventually gets me to the result as Dask workers start raising TCP timeout errors after a while), but it is way too slow and leaves plenty of resources unused.