2

I'm writing a program where a variable number of Agent objects concurrently run a number of serial methods and store their return values in a queue attribute. Each Agent has as an attribute a single Worker (subclass of Process) and feeds it jobs to run serially through a cmd_queue. The Agent gets the results from its Worker in a res_queue. These are currently Manager().Queue() instances and cause: TypeError: Pickling an AuthenticationString object is disallowed for security reasons However, if I used a regular Queue.Queue, the Workers get a copy of the Agent's cmd_queue and cannot see what the Agent adds to it (it's always empty).

I'm able to pickle instance methods with the solution referenced in this question : Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map()

from multiprocessing import Manager, Process
from time import sleep
import copy_reg  

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method  

class Worker(Process):
    def __init__(self, cmd_queue, res_queue):
        self.cmd_queue = cmd_queue
        self.res_queue = res_queue
        Process.__init__(self)

    def run(self):
        while True:
            f, args, kwargs = self.cmd_queue.get()
            self.res_queue.put( f(*args, **kwargs) )  

class Agent:
    def __init__(self):
        self.cmd_queue = Manager().Queue()
        self.res_queue = Manager().Queue()
        self.worker = Worker(self.cmd_queue, self.res_queue)
        self.worker.start()

    def produce(self, f, *args, **kwargs):
        self.cmd_queue.put((f, args, kwargs))

    def do_some_work(self):
        self.produce(self.foo, waka='waka')

    def do_some_other_work(self):
        self.produce(self.bar, humana='humana')

    def foo(self, **kwargs):
        sleep(5)
        return('this is a foo')

    def bar(self, **kwargs):
        sleep(10)
        return('this is a bar')

    def get_results(self):  #blocking call
        res = []
        while not self.cmd_queue.empty():#wait for Worker to finish
            sleep(.5)
        while not self.res_queue.empty():
            res.append(self.res_queue.get())
        return res  

#This is the interface I'm looking for.
if __name__=='__main__':
    agents = [Agent() for i in range(50)]
    #this should flow quickly as the calls are added to cmd_queues
    for agent in agents:        
        agent.do_some_work()
        agent.do_some_other_work()  
    for agent in agents:
        print(agent.get_results())

My question is, how I would be able to get this code to work using multiprocessing or is there a better, more accepted method of getting this pattern to work? This is a smaller part of a larger framework so I'd like it to be as OO friendly as possible.

Edit: This is in python 2.7.

Community
  • 1
  • 1
PrckPgn
  • 37
  • 2
  • 6

2 Answers2

2

You can do this using an ordinary multiprocessing.Queue. You just need to tweak the Agent class so that it doesn't try to pickle the Queue instances when the Agent class itself is pickled. This is required because you have to pickle the Agent instance itself when you pickle the instance methods you're sending to Worker. It's easy enough to do this, though:

class Agent(object): # Agent is now a new-style class
    def __init__(self):
        self.cmd_queue = Queue()
        self.res_queue = Queue()
        self.worker = Worker(self.cmd_queue, self.res_queue)
        self.worker.start()

    def __getstate__(self):
        """ This is called to pickle the instance """
        self_dict = self.__dict__.copy()
        del self_dict['cmd_queue']
        del self_dict['res_queue']
        del self_dict['worker']
        return self_dict

    def __setstate__(self, self_dict):
        """ This is called to unpickle the instance. """
        self.__dict__ = self_dict

    ... # The rest is the same.

Note that there are some other logic issues in this code that keeps it from running properly; get_results doesn't really do what you expect it to do, because this is susceptible to race conditions:

    while not self.cmd_queue.empty():#wait for Worker to finish
        sleep(.5)
    while not self.res_queue.empty():
        res.append(self.res_queue.get())

cmd_queue could (and does, with your example code) end up being empty before the functions you've actually passed to it are done running inside Worker, which will mean some of your results will be missing when you pull everything out of res_queue. You can fix that by using a JoinableQueue, which allows the workers to actually signal when they're done.

You should also send a sentinel to the worker processes so that they shut down properly, and so that all their results get flushed from res_queue and sent back to the parent properly. I also found I needed to add a sentinel to res_queue, otherwise sometimes res_queue would show up as empty in the parent before the last result written to it from a child actually got flushed across the pipe, meaning the last result would get lost.

Here's a complete working example:

from multiprocessing import Process, Queue, JoinableQueue
import types
from time import sleep
import copy_reg  

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)

class Worker(Process):
    def __init__(self, cmd_queue, res_queue):
        self.cmd_queue = cmd_queue
        self.res_queue = res_queue
        Process.__init__(self)

    def run(self):
        for f, args, kwargs in iter(self.cmd_queue.get, 
                                    (None, (), {})): # None is our sentinel
            self.res_queue.put( f(*args, **kwargs) )  
            self.cmd_queue.task_done() # Mark the task as done.
        self.res_queue.put(None) # Send this to indicate no more results are coming
        self.cmd_queue.task_done() # Mark the task as done

class Agent(object):
    def __init__(self):
        self.cmd_queue = JoinableQueue()
        self.res_queue = Queue()
        self.worker = Worker(self.cmd_queue, self.res_queue)
        self.worker.start()

    def __getstate__(self):
        self_dict = self.__dict__.copy()
        del self_dict['cmd_queue']
        del self_dict['res_queue']
        del self_dict['worker']
        return self_dict

    def __setstate__(self, self_dict):
        self.__dict__ = self_dict

    def produce(self, f, *args, **kwargs):
        self.cmd_queue.put((f, args, kwargs))

    def do_some_work(self):
        self.produce(self.foo, waka='waka')

    def do_some_other_work(self):
        self.produce(self.bar, humana='humana')

    def send_sentinel(self):
        self.produce(None)

    def foo(self, **kwargs):
        sleep(2)
        return('this is a foo')

    def bar(self, **kwargs):
        sleep(4)
        return('this is a bar')

    def get_results(self):  #blocking call
        res = []
        self.cmd_queue.join() # This will block until task_done has been called for every put pushed into the queue.
        for out in iter(self.res_queue.get, None):  # None is our sentinel
            res.append(out)
        return res  

#This is the interface I'm looking for.
if __name__=='__main__':
    agents = [Agent() for i in range(50)]
    #this should flow quickly as the calls are added to cmd_queues
    for agent in agents:        
        agent.do_some_work()
        agent.do_some_other_work()  
        agent.send_sentinel()
    for agent in agents:
        print(agent.get_results())

Output:

['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
dano
  • 91,354
  • 19
  • 222
  • 219
  • This is also a nice solution. It does involve some of the requisite head-standing to get `multiprocessing` to work... I might suggest, to make life a little simpler and replace 16 lines (from `import copy_reg` to `copy_reg.pickle(…)`) with one line: `import dill`. That one line registers `type.Method`, and is the same as your 16 lines. It adds a dependency outside of the standard library, but unless you are an absolute purist, that shouldn't matter… – Mike McKerns Apr 15 '15 at 16:16
  • Thank you, this is huge. I was able to get it running earlier by using some global dictionaries whose keys are uuids for each Agent/Worker pair and whose values were Manager.Queues (along with one with an Event for the race condition you mentioned) but it was a little wonky. This is **the** answer for all relevant SO questions imo. – PrckPgn Apr 15 '15 at 17:11
  • Why did you set `self.daemon = True` in Worker? Is that a robustness thing to allow it create processes in the future or was it necessary for your solution? – PrckPgn Apr 15 '15 at 17:30
  • @PrckPgn Actually, that shouldn't be there at all. At first I was using daemon processes instead of sentinels, but sentinels are cleaner, so I switched to that. I just forgot to move `self.daemon = True` line. – dano Apr 15 '15 at 18:09
1

Would you be happy with a very mild fork of multiprocessing that can make this pattern work? If so, you only have to look down a little further in the link you referred to in your question: Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map().

As pathos.multiprocessing has a Pool that can pickle instance methods in a very clean way, you can just work as you would code in serial python… and it just works… even directly from the interpreter.

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> from Queue import Queue
>>> from time import sleep
>>> 
>>> class Agent:
...   def __init__(self):
...     self.pool = Pool()
...     self.queue = Queue()
...   def produce(self, f, *args, **kwds):
...     self.queue.put(self.pool.apipe(f, *args, **kwds))
...   def do_some_work(self):
...     self.produce(self.foo, waka='waka')
...   def do_some_other_work(self):
...     self.produce(self.bar, humana='humana')
...   def foo(self, **kwds):
...     sleep(5)
...     return 'this is a foo'
...   def bar(self, **kwds):
...     sleep(10) 
...     return 'this is a bar'
...   def get_results(self):
...     res = []
...     while not self.queue.empty():
...       res.append(self.queue.get().get())
...     return res
... 
>>> agents = [Agent() for i in range(50)]
>>> for agent in agents:
...   agent.do_some_work()
...   agent.do_some_other_work()
... 
>>> for agent in agents:
...   print(agent.get_results())
... 
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
>>> 

Get pathos here: https://github.com/uqfoundation

Community
  • 1
  • 1
Mike McKerns
  • 33,715
  • 8
  • 119
  • 139