As best as I can tell, to make this pattern work on Windows, you need to create a picklable queue.Queue
. You can do that by creating a child class of Queue
that defines __setstate__
and __getstate__
, and have it only pickle the pieces of state that we actually need to send between processes, and leave the other stuff (unpicklable internal locks) out.
The other changes we need to make are to move the custom Manager
class definitions to the top-level, and to not use lambda
functions as the argument to callable
. Instead, we use a partial
and a top-level function, because that can be pickled. Here's the final code:
import sys
from multiprocessing.managers import SyncManager
from functools import partial
import multiprocessing
from Queue import Queue as _Queue
class Queue(_Queue):
""" A picklable queue. """
def __getstate__(self):
# Only pickle the state we care about
return (self.maxsize, self.queue, self.unfinished_tasks)
def __setstate__(self, state):
# Re-initialize the object, then overwrite the default state with
# our pickled state.
Queue.__init__(self)
self.maxsize = state[0]
self.queue = state[1]
self.unfinished_tasks = state[2]
def get_q(q):
return q
class JobQueueManager(SyncManager):
pass
def make_server_manager(port, authkey):
job_q = Queue()
result_q = Queue()
job_q.put("hey")
JobQueueManager.register('get_job_q', callable=partial(get_q, job_q))
JobQueueManager.register('get_result_q', callable=partial(get_q, result_q))
manager = JobQueueManager(address=('', port), authkey=authkey)
#manager.start()
print('Server started at port %s' % port)
return manager
def make_client_manager(port, authkey):
JobQueueManager.register('get_job_q')
JobQueueManager.register('get_result_q')
manager = JobQueueManager(address=('localhost', port), authkey=authkey)
manager.connect()
queue = manager.get_job_q()
print("got queue {}".format(queue))
print(queue.get_nowait())
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "--client":
make_client_manager(50000, 'abcdefg')
else:
manager = make_server_manager(50000, "abcdefg")
server = manager.get_server()
server.serve_forever()