0

In for loops, I are sending jobs using python pool.apply_async() calls.

The python code below was initially written by me and then edited by @Santiago Magariños

import multiprocessing
import numpy as np
from time import time, sleep
from random import random

chrNames=['chr1','chr2','chr3']
sims=[1,2,3]



def accumulate_chrBased_simBased_result(chrBased_simBased_result,accumulatedSignalArray,accumulatedCountArray):    
    signalArray = chrBased_simBased_result[0]
    countArray = chrBased_simBased_result[1]

    accumulatedSignalArray += signalArray
    accumulatedCountArray += countArray


def func(chrName,simNum):

    result=[]
    sleep(random()*5)
    signal_array=np.full((10000,), simNum, dtype=float)
    count_array = np.full((10000,), simNum, dtype=int)
    result.append(signal_array)
    result.append(count_array)
    print('%s %d' %(chrName,simNum))

    return result


if __name__ == '__main__':

    accumulatedSignalArray = np.zeros((10000,), dtype=float)
    accumulatedCountArray = np.zeros((10000,), dtype=int)

    numofProcesses = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(numofProcesses)

    results = []
    for chrName in chrNames:
        for simNum in sims:
            results.append(pool.apply_async(func, (chrName,simNum,)))

    for i in results:
        print(i)

    while results:
        for r in results[:]:
            if r.ready():
                print('{} is ready'.format(r))
                accumulate_chrBased_simBased_result(r.get(),accumulatedSignalArray,accumulatedCountArray)
                results.remove(r)

    pool.close()
    pool.join()

    print(accumulatedSignalArray)
    print(accumulatedCountArray)

Is there a way to accumulate the result of a pool.apply_async() call whenever it is available without collecting them in a list like structure?

burcak
  • 1,009
  • 10
  • 34
  • [In the docs](https://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.multiprocessing.Pool.apply_async) you can see that there is a callback that you can specify that will run when the result is ready. – Jose A. García Oct 17 '19 at 17:28
  • The problem is that I can not find good example code for callback. And I need to accumulate the result of apply_async calls in some other data structures such as accumulatedSignalArray and accumulatedCountArray in this case. Callback takes one input only. – burcak Oct 17 '19 at 17:42
  • I'd recommend using `pool.imap_unordered` instead of lots of calls to `apply_async`. it exposes the interface you seem to be reimplementing – Sam Mason Oct 17 '19 at 19:15
  • @SamMason when you use pool.imap_unordered() you have to provide all data together in an iterable, by using pool.apply_sync() you can provide data for each call and together with callback you do not need to collect what it returns. Therefore it seems that pool.apply_sync() better than pool.imap_unordered() – burcak Oct 17 '19 at 19:50
  • I'm not sure that either is "better". in your question you want to run a single function over a gridded set of parameters and collect results. this is a common operation hence the family of `map` methods which try to make this sort of thing easier. – Sam Mason Oct 18 '19 at 19:11
  • I also tried imap_unordered but only one processor is running, other processors are sleeping, what can be reason for it? – burcak Oct 18 '19 at 20:56
  • coordinating multiple processes takes work, this can easily dominate when the invoked function is cheap, see [Amdahl's law](https://en.wikipedia.org/wiki/Amdahl%27s_law). `imap_unordered` includes a `chunksize` parameter which can help – Sam Mason Oct 22 '19 at 15:23
  • Yes, I have already ```set chunksize=max(1,(numberofTasks//numberofProcessors))```. However what helped me in fact is the usage of ```()``` generator instead of ```[]``` in list comprehension during creation of input list to the imap_unordered() – burcak Oct 22 '19 at 17:53
  • Have a look at this link:https://stackoverflow.com/questions/18519420/how-to-parallelize-big-for-loops-in-python – burcak Oct 22 '19 at 17:58

1 Answers1

2

Something like this could work. Pretty much copying your code and adding a callback, note that this works because the pool is joined before we access the accumulator values. If not we'd need some other type of synchronization mechanism

class Accumulator:
    def __init__(self):
        self.signal = np.zeros((10000,), dtype=float)
        self.count = np.zeros((10000,), dtype=int)

    def on_result(self, result):
        self.signal += result[0]
        self.count += result[1]

if __name__ == '__main__':

    num_proc = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(num_proc)

    accumulator = Accumulator()
    for chrName in chrNames:
        for simNum in sims:
            pool.apply_async(func, (chrName,simNum,), callback=accumulator.on_result)

    pool.close()
    pool.join()

    print(accumulator.signal)
    print(accumulator.count)
Jose A. García
  • 888
  • 5
  • 13
  • Don't we need something like result.get()[0]? – burcak Oct 17 '19 at 18:45
  • @burcak No, it passes the result value directly to the callback. The only reason you needed to call `get` previously is because at calling time the result is not ready yet and so the library gives you a handle to the "future" result. You call `get` in that future to access the actual result. – Jose A. García Oct 17 '19 at 18:57
  • Do you mean that when callback sends the result it is always ready, therefore no need to call get()? Thanks. – burcak Oct 17 '19 at 19:12
  • @burcak Yes, that's the purpose of the callback. To be called when the result is ready. – Jose A. García Oct 17 '19 at 19:40
  • I'm running code for pool.apply_async(), calling pool.apply_async() in for loops, but it seems that it is using only one processor and other all processors are sleeping. What can be the reason for that? – burcak Oct 18 '19 at 16:38