4

I understand that multiprocessing.Queue has to be passed to subprocess through inheritance. However, when I try passing Pipe to a subprocess through message passing, like the following code, the error I got isn't saying that "Pipe can only be shared between processes through inheritance". Instead it fails at q.get() and the error says TypeError: Required argument 'handle' (pos 1) not found. I'm wondering is it at all possible to do so? Assuming that the pipes are implemented using linux named pipes, then all it matters is the name of the pipe and it could be the states to be serialized and passed between processes right?

from multiprocessing import Process, Pipe, Queue    

def reader(q):
  output_p = q.get()
  msg = output_p.recv()
  while msg is not None:
    msg = output_p.recv()    

if __name__ == '__main__':
    q = Queue()
    reader_p = Process(target=reader, args=(q,))
    reader_p.start()     # Launch the reader process

    output_p, input_p = Pipe(True)
    q.put(output_p)

    input_p.send('MyMessage')
    input_p.send(None)
    reader_p.join()
Community
  • 1
  • 1
shaoyl85
  • 1,854
  • 18
  • 30

2 Answers2

1

This is a bug which has been fixed in Python 3.

Your code in Python 3 works flawlessly.

noxdafox
  • 14,439
  • 4
  • 33
  • 45
  • Cool! Glad to know that. By the way, is the underlying implementation in python 3 using something similar to Manager? Some of the implementation details of Manager doesn't work for me in my case. – shaoyl85 Feb 22 '16 at 05:56
  • For what I know, the underlying implementation remains a OS pipe. They just fixed the way it's serialised in order to be safely transferred. – noxdafox Feb 22 '16 at 12:48
0

noxadofox gave the correct answer here. I'm adding an example I devised to validate that pipes do not require inheritance. In this example I create a second pipe after the executor started its two processes and pass it to the existing processes as a parameter.

""" Multiprocessing pipe and queue test """
import multiprocessing
import concurrent.futures
import time


class Example:
    def __init__(self):
        manager = multiprocessing.Manager()
        q = manager.Queue()
        executor = concurrent.futures.ProcessPoolExecutor(max_workers=2)

        pipe_out_1, pipe_in_1 = multiprocessing.Pipe(duplex=True)
        executor.submit(self.requester, q, pipe_in_1)
        executor.submit(self.worker, q, pipe_out_1)
        print(executor._processes)

        pipe_out_2, pipe_in_2 = multiprocessing.Pipe(duplex=True)
        executor.submit(self.requester, q, pipe_in_2)
        executor.submit(self.worker, q, pipe_out_2)
        print(executor._processes)

    @staticmethod
    def worker(q, pipe_out):
        task = q.get()
        print('worker got task {}'.format(task))
        pipe_out.send(task + '-RESPONSE')
        print('loop_proc sent')

    @staticmethod
    def requester(q, pipe_in):
        q.put('TASK')
        response = pipe_in.recv()
        print('requester got response {}'.format(response))
        time.sleep(2)


if __name__ == '__main__':
    Example()
    time.sleep(30)
David Parks
  • 30,789
  • 47
  • 185
  • 328