-1

I have a nested for loop which I have used multiprocessing.Pool to parallize my inner loop. Here is an example code:

for i in range(N):
    for k in range(L):
        x[k] = Do_Something(x[k])       

So as you can see, every iteration of i depends on the previous interation while the k for loop is "embarassingly" parallel with no dependance on which k finishes first. This naturally pointed me towards using appy_async.

The parallelize inner loop code looks something like this:

pool = mp.Pool(Nworkers)
for i in range(N):
    for k in range(L):
         pool.apply_async(Do_Something, args=(k), callback=getresults)
pool.join()
pool.close()

Writing the code this way screws up the order on the i loop since async does not wait for the k jobs to finish before moving on to the next i loop. The question is: Is there a way to pause the async until all the jobs from the k loops finish before moving on to the next iteration of i?. Using apply_async is benifical here since the callback allows me to store the results in a given order. I saw some other answers, here, and here, but they uses alternative solutions like map, which seems like a valid alternative way, but I'd like to stick to apply_async here and wait for the jobs to finish...

I've also tried to stop and reinitalize workers on every iternation of i, but the overhead from mp.Pool() at every i is not very efficent... Heres what I tried:

for i in range(N):
    pool = mp.Pool(Nworkers)
    for k in range(L):
         pool.apply_async(Do_Something, args=(k), callback=getresults)
    pool.join()
    pool.close()

patrick7
  • 366
  • 1
  • 11

1 Answers1

1

There are a few of issues with your current code:

  1. The args parameter to the apply_async method takes an iterable, e.g. a list or tuple. You have specified args=(k) but (k) is not a tuple, it is just a parenthesized expression. You need args=(k,).
  2. Your calls to pool.join() and pool.close() are in the wrong order.
  3. Your non-parallelized version specifies x[k] = Do_Something(x[k]) where you are passing to the function argument x[k]. In your parallelized version you are just passing k. Which is correct? I will assume the former.

As you have already determined, x[k] starts out with some value and then when you invoke x[k] = Do_Something(x[k]) x[k] ends up with a new value which will then be passed to Do_Something on the next iteration of variable i. Therefore, you do need to submit tasks and process results in a loop for each value of i. But you do not want to use a callback, which gets results in completion order instead of submission order. The call to apply_async returns a multiprocessing.AsyncResult instance. If you call method get on this instance it blocks until the submitted task ends and then fetches the result:

pool = mp.Pool(Nworkers)

# Don't use a callback with apply_async, which
# gets results in completion order instead of
# submission order:
for i in range(N):
    # We must wait for all the return values are assigned
    # to x[k] before submitting tasks for the next i value:
    async_results = [
        pool.apply_async(Do_Something, args=(x[k],))
            for k in range(L)
    ]
    # Set the new values of x[k] for the next iteration
    # of the outer loop:
    for k in range(L):
        x[k] = async_results[k].get()
pool.join()
pool.close()

But simpler would be to use the imap method:

pool = mp.Pool(Nworkers)

for i in range(N):
    # We must wait for all the return values are assigned
    # to x[k] before submitting tasks for the next i value:
    for k, result in enumerate(pool.imap(Do_Something, x)):
        x[k] = result
pool.join()
pool.close()

Or even the map method, which will create a new list and is less memory efficient (not an issue unless you are dealing with a very large x):

pool = mp.Pool(Nworkers)

for i in range(N):
    # New x list:
    x = pool.map(Do_Something, x)
pool.join()
pool.close()

Here is a minimal, reproducible example:

# Successively square values. For N = 3, we are
# essentially raising a value to the 2 ** 3 = 8th power:

def Do_Something(value):
    """
    Square a value.
    """
    return value ** 2

# Required for Windows:
if __name__ == '__main__':
    import multiprocessing as mp


    x = [2, 3]
    L = len(x)
    N = 3

    Nworkers = min(L, mp.cpu_count())

    pool = mp.Pool(Nworkers)

    for i in range(N):
        # New x list:
        x = pool.map(Do_Something, x)
    print(x)
    pool.close()
    pool.join()

Prints:

[256, 6561]

But Better Yet Is ...

If you can modify Do_Something, then move the looping on i to this function. In this way you are submitting fewer but more CPU-intensive tasks, which is what you would like:

# Successively square values. For N = 3, we are
# essentially raising a value to the 2 ** 3 = 8th power:

N = 3

def Do_Something(value):
    """
    Square a value N times:
    """
    for _ in range(N):
        value = value ** 2
    return value

# Required for Windows:
if __name__ == '__main__':
    import multiprocessing as mp


    x = [2, 3]
    L = len(x)

    Nworkers = min(L, mp.cpu_count())

    pool = mp.Pool(Nworkers)

    # New x list:
    x = pool.map(Do_Something, x)
    print(x)
    pool.close()
    pool.join()

if you cannot modify Do_Something, then create a new function, Do_Something_N:

# Successively square values. For N = 3, we are
# essentially raising a value to the 2 ** 3 = 8th power:

N = 3

def Do_Something(value):
    """
    Square a value.
    """
    return value ** 2

def Do_Something_N(value):
    for _ in range(N):
        value = Do_Something(value)
    return value

# Required for Windows:
if __name__ == '__main__':
    import multiprocessing as mp


    x = [2, 3]
    L = len(x)

    Nworkers = min(len(x), mp.cpu_count())

    pool = mp.Pool(Nworkers)

    # New x list:
    x = pool.map(Do_Something_N, x)
    print(x)
    pool.close()
    pool.join()
Booboo
  • 38,656
  • 3
  • 37
  • 60