0

I have a function which I am applying to different chunks of my data. Since each chunk is independent of the rest, I wish to execute the function for all chunks in parallel.

I have a result dictionary which should hold the output of calculations for each chunk.

Here is how I did it:

from joblib import Parallel, delayed
import multiprocessing

cpu_count = multiprocessing.cpu_count() 

# I have 8 cores, so I divide the data into 8 chunks.
endIndeces = divideIndecesUniformly(myData.shape[0], cpu_count) # e.g., [0, 125, 250, ..., 875, 1000]

# initialize result dictionary with empty lists.
result = dict()
for i in range(cpu_count):
    result[i] = []

# Parallel execution for 8 chunks
Parallel(n_jobs=cpu_count)(delayed(myFunction)(myData, start_idx=endIndeces[i], end_idx=endIndeces[i+1]-1, result, i) for i in range(cpu_count))

However, when the execution is finished result has all initial empty lists. I figured that if I execute the function serially over each chunk of data, it works just fine. For example, if I replace the last line with the following, result will have all the calculated values.

# Instead of parallel execution, call the function in a for-loop.
for i in range(cpu_count):
    myFunction(myData, start_idx=endIndeces[i], end_idx=endIndeces[i+1]-1, result, i)

In this case, result values are updated.

It seems that when the function is executed in parallel, it cannot write on the given dictionary (result). So, I was wondering how I can obtain the output of function for each chunk of data?

Matin Kh
  • 5,192
  • 6
  • 53
  • 77

1 Answers1

0

joblib, by default uses the multiprocessing module in python. According to this SO Answer, when arguments are passed to new Processes they create a fork, which copies the memory space of the current process. This means that myFunction is essentially working on a copy of result and does not modify the original.

My suggestion is to have myFunction return the desired data as a list. The call to Process will then return a list of the lists generated by myFunction. From there, it is simple to add them to results. It could look something like this:

from joblib import Parallel, delayed
import multiprocessing

if __name__ == '__main__':
    cpu_count = multiprocessing.cpu_count() 

    endIndeces = divideIndecesUniformly(myData.shape[0], cpu_count)

    # make sure myFunction returns the grouped results in a list
    r = Parallel(n_jobs=cpu_count)(delayed(myFunction)(myData, start_idx=endIndeces[i], end_idx=endIndeces[i+1]-1, result, i) for i in range(cpu_count))

    result = dict()
    for i, data in enumerate(r):  # cycles through each resultant chunk, numbered and in the original order
        result[i] = data
Matin Kh
  • 5,192
  • 6
  • 53
  • 77
Eric Ed Lohmar
  • 1,832
  • 1
  • 17
  • 26