0

I would like to use pipes to communicate between worker processes generated from a multiprocessing Pool. I am trying to pass the pipes to the pool as an iterable, but the code consistently hangs.

Here is the code that hangs. It is very simple, and in fact, doesn't even use the pipes (although they are passed to the worker function).

import os
import multiprocessing as mp
from multiprocessing import Pool

def worker(d):
    j,p = d      # Notice that p (a pipe) is never used!)
    pid = os.getpid()
    msg = "Greetings from job {} ({})".format(j,pid)
    print(msg)
    return (j,pid)

# Main program
np = 4
pipes_0,pipes_1 = zip(*[mp.Pipe() for i in range(np)])
data = zip(range(np),pipes_1)  # Doesn't work (even though pipes not used)
# data = zip(range(np),range(np))  # Works

pool = Pool(processes=np)
results_async = pool.map_async(func=worker, iterable=data)
results = results_async.get()
print(results)

When the pipes are passed as part of the zipped iterable, the output usually hangs with the following output :

Greetings from job 0 (35824)
Greetings from job 1 (35825)
(code usually hangs here....)

What is strange is that I don't use the pipes in the above code, so it seems that something is going on inside the pool that is expecting from something from the pipes.

If I don't include the pipe as part of the data passed to the worker, (using the commented definition of data) the code works as expected and produces

Greetings from job 0 (35865)
Greetings from job 1 (35866)
Greetings from job 2 (35867)
Greetings from job 3 (35868)
[(0, 35865), (1, 35866), (2, 35867), (3, 35868)]

As point of comparison, a similar code in which processes are forked explicitly (using mp.Process instead of a pool) works as expected in either case.

In fact, this code uses the pipes inside the function, and works perfectly.

import os
import multiprocessing as mp

def worker(d):
    j,p = d
    pid = os.getpid()
    p.send("Greetings from job {} ({})".format(j,pid))

# Main program
np = 4
pipes_0,pipes_1 = zip(*[mp.Pipe() for i in range(np)])
data = zip(range(np),pipes_1)

jobs = []
for d in data:
    p = mp.Process(target=worker,args=[d])
    p.start()
    jobs.append(p)

for p0 in pipes_0:
    print("{:s}".format(p0.recv()))

for j in jobs:
     j.join()

print("Done")

produces the expected output.

Greetings from job 0 (35834)
Greetings from job 1 (35835)
Greetings from job 2 (35836)
Greetings from job 3 (35837)
Done

Originally, I thought that by explicitly launching processes, I was just lucky in avoiding any deadlock, and that the more complicated execution schedule used by the Pool introduced enough lag in launching jobs to lead to deadlock.

But that doesn't explain why the pool code doesn't work, even when the pipes are not referenced at all.

I am running on OSX 10.13.2, Python 3.6.3 |Anaconda custom (64-bit)|

Any insight would be really helpful!

Donna
  • 1,390
  • 1
  • 14
  • 30

2 Answers2

1

Isn't it the same as here ? Passing a Pipe/Connection as context arg to multiprocessing Pool.apply_async()

I guess you are not getting the error message mentioned there because of Mac OS.

The answer in the link says it is a bug of Python 2. I tried your code with Python 3 and it worked.

sbond
  • 168
  • 1
  • 8
  • The posted solution addresses some of the issues (although is a bit hard to parse all the jargon). One insight is that evidently, Pipes are used by the Pool to communicate results (possible the return arguments?) So maybe there is some confusion going on here. But then all of this works in a process. Shouldn't pools work as just a fancier execution schedule for processes? BTW, I am running Python 3.6.3 (from Anaconda). – Donna Feb 01 '18 at 01:01
  • The post [here](https://stackoverflow.com/questions/45178447/multiprocessing-queue-as-arg-to-pool-worker-aborts-execution-of-worker?rq=1) show a much simpler illustration of the above problem . The second code in that post also doesn't work for me (just hangs). Hm.... – Donna Feb 01 '18 at 01:15
  • yeah, I ran anaconda python 3 on Mac OS and it hangs indeed. On linux I didn't use anaconda, just a distribution python3. So I can't say if MacOS or anaconda is a reason of hanging. – sbond Feb 01 '18 at 01:31
  • Just tried my code on a Linux machine (Python 3.5.x) and it works. But the simpler code referenced in my previous comment hangs in Linux as well ... – Donna Feb 01 '18 at 01:31
  • Too bad this doesn't work - just when I was starting to see the real benefits of the Pool .... !!! – Donna Feb 01 '18 at 01:35
  • I was also using Anaconda Python on Linux (I only have 2.7.x in the distribution version) – Donna Feb 01 '18 at 02:05
0

This issue was a bug in earlier 2.x versions of Python, and there have been several posts on this issue. Supposedly, though, the bug was fixed as of Python 3.3. However, I am running Python 3.6 on OSX, and my codes hangs.

As a point of comparison, I ran the code posted here and the results are similar. In the second code in that post, a queue is passed as an argument to a pool worker. That code hangs for me on both Linux (Anaconda 3.5) and OSX (Anaconda 3.6).

Oddly, my code runs on the Linux version of Anaconda. Pipes good, Queues bad?

And I was starting to like pools.

Donna
  • 1,390
  • 1
  • 14
  • 30