First, var1
and var2
are not defined so I have modified this function to do something (somewhat) useful.
Second, if you read the documentation on multiprocessing.Queue
, especially the warnings, you will see that you can get in a deadlock if you attempt to do get
calls against a queue after you join a process that has been doing put
calls to the queue because the writing process must still be running for you to successfully get all the items it has placed on the queue. You will also see that calls to empty
on a multiprocessing queue are not reliable.
So the simplest change for you to make, since you know how many items are being put to each queue would be:
import multiprocessing as mp
def foo(q1, q2, data):
# do some stuff
q1.put(data ** 2)
q2.put(data ** 3)
def main_function(list_of_stuff):
q1 = mp.Queue()
q2 = mp.Queue()
jobs = []
for data in list_of_stuff:
p = mp.Process(target=foo, args=(q1, q2, data))
p.start()
jobs.append(p)
list1 = []
list2 = []
for _ in range(len(list_of_stuff)):
new = q1.get()
list1.append(new)
new = q2.get()
list2.append(new)
# wait for all processes to end before continuing
for job in jobs:
job.join()
return list1, list2
if __name__ == '__main__':
list1, list2 = main_function([1, 2, 3])
print(list1, list2)
``
Prints:
```lang-None
[1, 4, 9] [1, 8, 27]
If your main process did not know how many items the child process would be placing on the output queue (or queues), then you would need to organize your code differently and use sentinel items placed on the queues to signify that there are no more items to be retrieved:
import multiprocessing as mp
def square(q, data):
# do some stuff
for x in data:
q.put(x ** 2)
# Show no more items on queue will be put:
q.put(None)
def cube(q, data):
# do some stuff
for x in data:
q.put(x ** 3)
# Show no more items on queue will be put:
q.put(None)
def main_function(list_of_stuff):
q1 = mp.Queue()
q2 = mp.Queue()
p1 = mp.Process(target=square, args=(q1, list_of_stuff))
p2 = mp.Process(target=cube, args=(q2, list_of_stuff))
p1.start()
p2.start()
list1 = []
while True:
new = q1.get()
if new is None:
# Seen sentinel item:
break
list1.append(new)
list2 = []
while True:
new = q2.get()
if new is None:
# Seen sentinel item:
break
list2.append(new)
# wait for all processes to end before continuing
p1.join()
p2.join()
return list1, list2
if __name__ == '__main__':
list1, list2 = main_function([1, 2, 3])
print(list1, list2)
Prints:
[1, 4, 9] [1, 8, 27]
If you want to keep your original structure where foo
calculates a square and a cube for each data item and you want a separate process for each item in the passed list_of_stuff
, then use a multiprocessing pool:
import multiprocessing as mp
def foo(x):
# do some stuff
return x ** 2, x ** 3
def main_function(list_of_stuff):
pool_size = min(mp.cpu_count(), len(list_of_stuff))
with mp.Pool(pool_size) as pool:
results = pool.map(foo, list_of_stuff)
list1 = []
list2 = []
for square, cube in results:
list1.append(square)
list2.append(cube)
return list1, list2
if __name__ == '__main__':
list1, list2 = main_function([1, 2, 3])
print(list1, list2)
Note that since the worker function foo
is pure CPU, there is no point in making the pool size greater than the number of tasks that will be submitted nor greater than the number of CPU cores you have. By using the map
function, you are guaranteed that the results returned are in the same order as the tasks submitted. In your original code there was no guarantee on the order in which results would be written to the queues by the child processes so it would be possible to end up with list1
, for example, being [4, 1, 9]
.