I would like to use pipes to communicate between worker processes generated from a multiprocessing Pool. I am trying to pass the pipes to the pool as an iterable
, but the code consistently hangs.
Here is the code that hangs. It is very simple, and in fact, doesn't even use the pipes (although they are passed to the worker function).
import os
import multiprocessing as mp
from multiprocessing import Pool
def worker(d):
j,p = d # Notice that p (a pipe) is never used!)
pid = os.getpid()
msg = "Greetings from job {} ({})".format(j,pid)
print(msg)
return (j,pid)
# Main program
np = 4
pipes_0,pipes_1 = zip(*[mp.Pipe() for i in range(np)])
data = zip(range(np),pipes_1) # Doesn't work (even though pipes not used)
# data = zip(range(np),range(np)) # Works
pool = Pool(processes=np)
results_async = pool.map_async(func=worker, iterable=data)
results = results_async.get()
print(results)
When the pipes are passed as part of the zipped iterable, the output usually hangs with the following output :
Greetings from job 0 (35824)
Greetings from job 1 (35825)
(code usually hangs here....)
What is strange is that I don't use the pipes in the above code, so it seems that something is going on inside the pool that is expecting from something from the pipes.
If I don't include the pipe as part of the data passed to the worker, (using the commented definition of data
) the code works as expected and produces
Greetings from job 0 (35865)
Greetings from job 1 (35866)
Greetings from job 2 (35867)
Greetings from job 3 (35868)
[(0, 35865), (1, 35866), (2, 35867), (3, 35868)]
As point of comparison, a similar code in which processes are forked explicitly (using mp.Process
instead of a pool) works as expected in either case.
In fact, this code uses the pipes inside the function, and works perfectly.
import os
import multiprocessing as mp
def worker(d):
j,p = d
pid = os.getpid()
p.send("Greetings from job {} ({})".format(j,pid))
# Main program
np = 4
pipes_0,pipes_1 = zip(*[mp.Pipe() for i in range(np)])
data = zip(range(np),pipes_1)
jobs = []
for d in data:
p = mp.Process(target=worker,args=[d])
p.start()
jobs.append(p)
for p0 in pipes_0:
print("{:s}".format(p0.recv()))
for j in jobs:
j.join()
print("Done")
produces the expected output.
Greetings from job 0 (35834)
Greetings from job 1 (35835)
Greetings from job 2 (35836)
Greetings from job 3 (35837)
Done
Originally, I thought that by explicitly launching processes, I was just lucky in avoiding any deadlock, and that the more complicated execution schedule used by the Pool introduced enough lag in launching jobs to lead to deadlock.
But that doesn't explain why the pool code doesn't work, even when the pipes are not referenced at all.
I am running on OSX 10.13.2, Python 3.6.3 |Anaconda custom (64-bit)|
Any insight would be really helpful!