2

I want to use pipes to talk to the process instances in my pool, but I'm getting an error:

Let __p be an instance of Pool():

    (master_pipe, worker_pipe) = Pipe()

    self.__p.apply_async(_worker_task, 
                         (handler_info, 
                          context_info,
                          worker_pipe))

When I execute this, I get the following error [for every instance, obviously]:

  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 376, in get
    task = get()
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 376, in get
TypeError: Required argument 'handle' (pos 1) not found
    self.run()
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 114, in run
    return recv()
    return recv()
    self._target(*self._args, **self._kwargs)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 102, in worker
TypeError: Required argument 'handle' (pos 1) not found
TypeError: Required argument 'handle' (pos 1) not found
    task = get()
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 376, in get
    return recv()
TypeError: Required argument 'handle' (pos 1) not found

The error is specifically referring to the Connection instance that I'm trying to pass. If I make it "None", the workers fork without error.

I don't understand this since, as the document emphasizes through example, I can easily pass the same argument to a Process(), and have it work perfectly:

from multiprocessing import Pipe, Process
def call_me(p):
  print("Here: %s" % (p))

(master, worker) = Pipe()
p = Process(target=call_me, args=(worker,))
p.start()

Here: <read-write Connection, handle 6>

p.join()
Dustin Oprea
  • 9,673
  • 13
  • 65
  • 105

3 Answers3

1

It looks like this bug (http://bugs.python.org/issue4892) noted in this discussion: Python 2.6 send connection object over Queue / Pipe / etc

The pool forks child processes initially with pipes for communicating tasks/results to/from the child processes. It's in communicating your Pipe object over the existing pipe that it blows up - not on the forking. (the failure is when the child process tries a get() on the queue abstraction).

It looks like the problem arises because of how the Pipe object is pickled/unpickled for communication.

In the second case that you noted, the pipe is passed to a process instance and then forked - thus the difference in behavior.

I can't imagine that actively communicating with pool processes outside of pure task distribution was an intended use case for multiprocessing pool though. State/protocol-wise, that would imply that you would want more control over the process. That would require more context than what the general Pool object could ever know.

Community
  • 1
  • 1
Jeremy Brown
  • 17,880
  • 4
  • 35
  • 28
  • 1
    If it's an allowed and exhibited use-case for Process(), the same should be said for Pools. – Dustin Oprea Nov 19 '13 at 21:20
  • As I noted above, the pathway is different when instantiating and starting (forking) a Process object. That does not require communicating the Pipe object over an existing pipe. It's available in the child process' context because of the fork. – Jeremy Brown Nov 19 '13 at 21:33
  • The use case is most certainly different as well - It implies that there is a level of persistence to the child process that requires a protocol for active communication with the parent for any arbitrary purpose. The pool is purpose-built for tackling independent, context free, atomic tasks - the whole reason why "apply_async" is named as it is. So my point really is that it seems like you're going to make things more difficult for yourself than needed. On the surface at least - but you would know better than me. – Jeremy Brown Nov 19 '13 at 21:34
  • Multiprocessing has an undocumented "reduction" module that's briefly mentioned in the examples in the official documentation (for a use case very similar to yours :) Look for "An example of how a pool of worker processes..."). Also check out http://stackoverflow.com/questions/8545307/multiprocessing-and-sockets-in-python. The official fix for the bug ends up leveraging this module, but for now it looks like you can use it manually for communicating the Pipe object (reduce in the main process context when using apply_async() and rebuild in the task handed off to the child process) – Jeremy Brown Nov 19 '13 at 21:57
1

This is possible to solve by using the initializer and initargs arguments when you create the pool and its processes. Admittedly there has to be a global variable involved as well. However if you put the worker code in a separate module, it doesn't look all that bad. And it is only global to that process. :-)

A typical case is that you want your worker processes to add stuff to a multiprocessing queue. As that has to do with something having to reside in a certain spot in the memory, pickling will not work. Even if it would have worked, it would just have copied data about the fact that some process has a queue. Which is the opposite of what we want here. We want to share the same queue.

So here is a meta code example:

The module containing the worker code, we call it "worker_module":

def worker_init(_the_queue):
    global the_queue
    the_queue = _the_queue

def do_work(_a_string):
    # Add something to the queue
    the_queue.put("the string " + _a_string)

And the creation of the pool, followed by having it doing something

# Import our functions
from worker_module import worker_init, do_work

# Good idea: Call it MPQueue to not confuse it with the other Queue
from multiprocessing import Queue as MPQueue
from multiprocessing import Pool

the_queue = MPQueue() 
# Initialize workers, it is only during initialization we can pass the_queue
the_pool = Pool(processes= 3, initializer=worker_init, initargs=[the_queue,])
# Do the work
the_pool.apply(do_work, ["my string",])
# The string is now on the queue
my_string = the_queue.get(True))
1

This is a bug which has been fixed in Python 3.

Easiest solution is to pass the queue through the Pool's initializer as suggested in the other answer.

noxdafox
  • 14,439
  • 4
  • 33
  • 45