There are a few of issues with your current code:
- The args parameter to the
apply_async
method takes an iterable, e.g. a list
or tuple
. You have specified args=(k)
but (k)
is not a tuple, it is just a parenthesized expression. You need args=(k,)
.
- Your calls to
pool.join()
and pool.close()
are in the wrong order.
- Your non-parallelized version specifies
x[k] = Do_Something(x[k])
where you are passing to the function argument x[k]
. In your parallelized version you are just passing k
. Which is correct? I will assume the former.
As you have already determined, x[k]
starts out with some value and then when you invoke x[k] = Do_Something(x[k])
x[k]
ends up with a new value which will then be passed to Do_Something
on the next iteration of variable i
. Therefore, you do need to submit tasks and process results in a loop for each value of i
. But you do not want to use a callback, which gets results in completion order instead of submission order. The call to apply_async
returns a multiprocessing.AsyncResult
instance. If you call method get
on this instance it blocks until the submitted task ends and then fetches the result:
pool = mp.Pool(Nworkers)
# Don't use a callback with apply_async, which
# gets results in completion order instead of
# submission order:
for i in range(N):
# We must wait for all the return values are assigned
# to x[k] before submitting tasks for the next i value:
async_results = [
pool.apply_async(Do_Something, args=(x[k],))
for k in range(L)
]
# Set the new values of x[k] for the next iteration
# of the outer loop:
for k in range(L):
x[k] = async_results[k].get()
pool.join()
pool.close()
But simpler would be to use the imap
method:
pool = mp.Pool(Nworkers)
for i in range(N):
# We must wait for all the return values are assigned
# to x[k] before submitting tasks for the next i value:
for k, result in enumerate(pool.imap(Do_Something, x)):
x[k] = result
pool.join()
pool.close()
Or even the map
method, which will create a new list and is less memory efficient (not an issue unless you are dealing with a very large x
):
pool = mp.Pool(Nworkers)
for i in range(N):
# New x list:
x = pool.map(Do_Something, x)
pool.join()
pool.close()
Here is a minimal, reproducible
example:
# Successively square values. For N = 3, we are
# essentially raising a value to the 2 ** 3 = 8th power:
def Do_Something(value):
"""
Square a value.
"""
return value ** 2
# Required for Windows:
if __name__ == '__main__':
import multiprocessing as mp
x = [2, 3]
L = len(x)
N = 3
Nworkers = min(L, mp.cpu_count())
pool = mp.Pool(Nworkers)
for i in range(N):
# New x list:
x = pool.map(Do_Something, x)
print(x)
pool.close()
pool.join()
Prints:
[256, 6561]
But Better Yet Is ...
If you can modify Do_Something
, then move the looping on i
to this function. In this way you are submitting fewer but more CPU-intensive tasks, which is what you would like:
# Successively square values. For N = 3, we are
# essentially raising a value to the 2 ** 3 = 8th power:
N = 3
def Do_Something(value):
"""
Square a value N times:
"""
for _ in range(N):
value = value ** 2
return value
# Required for Windows:
if __name__ == '__main__':
import multiprocessing as mp
x = [2, 3]
L = len(x)
Nworkers = min(L, mp.cpu_count())
pool = mp.Pool(Nworkers)
# New x list:
x = pool.map(Do_Something, x)
print(x)
pool.close()
pool.join()
if you cannot modify Do_Something
, then create a new function, Do_Something_N
:
# Successively square values. For N = 3, we are
# essentially raising a value to the 2 ** 3 = 8th power:
N = 3
def Do_Something(value):
"""
Square a value.
"""
return value ** 2
def Do_Something_N(value):
for _ in range(N):
value = Do_Something(value)
return value
# Required for Windows:
if __name__ == '__main__':
import multiprocessing as mp
x = [2, 3]
L = len(x)
Nworkers = min(len(x), mp.cpu_count())
pool = mp.Pool(Nworkers)
# New x list:
x = pool.map(Do_Something_N, x)
print(x)
pool.close()
pool.join()