I'm writing a program where a variable number of Agent objects concurrently run a number of serial methods and store their return values in a queue attribute. Each Agent has as an attribute a single Worker (subclass of Process) and feeds it jobs to run serially through a cmd_queue. The Agent gets the results from its Worker in a res_queue. These are currently Manager().Queue() instances and cause:
TypeError: Pickling an AuthenticationString object is disallowed for security reasons
However, if I used a regular Queue.Queue, the Workers get a copy of the Agent's cmd_queue and cannot see what the Agent adds to it (it's always empty).
I'm able to pickle instance methods with the solution referenced in this question : Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map()
from multiprocessing import Manager, Process
from time import sleep
import copy_reg
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method
class Worker(Process):
def __init__(self, cmd_queue, res_queue):
self.cmd_queue = cmd_queue
self.res_queue = res_queue
Process.__init__(self)
def run(self):
while True:
f, args, kwargs = self.cmd_queue.get()
self.res_queue.put( f(*args, **kwargs) )
class Agent:
def __init__(self):
self.cmd_queue = Manager().Queue()
self.res_queue = Manager().Queue()
self.worker = Worker(self.cmd_queue, self.res_queue)
self.worker.start()
def produce(self, f, *args, **kwargs):
self.cmd_queue.put((f, args, kwargs))
def do_some_work(self):
self.produce(self.foo, waka='waka')
def do_some_other_work(self):
self.produce(self.bar, humana='humana')
def foo(self, **kwargs):
sleep(5)
return('this is a foo')
def bar(self, **kwargs):
sleep(10)
return('this is a bar')
def get_results(self): #blocking call
res = []
while not self.cmd_queue.empty():#wait for Worker to finish
sleep(.5)
while not self.res_queue.empty():
res.append(self.res_queue.get())
return res
#This is the interface I'm looking for.
if __name__=='__main__':
agents = [Agent() for i in range(50)]
#this should flow quickly as the calls are added to cmd_queues
for agent in agents:
agent.do_some_work()
agent.do_some_other_work()
for agent in agents:
print(agent.get_results())
My question is, how I would be able to get this code to work using multiprocessing or is there a better, more accepted method of getting this pattern to work? This is a smaller part of a larger framework so I'd like it to be as OO friendly as possible.
Edit: This is in python 2.7.