There are a few subtleties. First, when you register with a manager a function that returns an object, the manager by default will attempt to build a proxy for that object. But the object you are returning is a managed queue that is already a proxied object. You should therefore just be returning an ordinary queue instance as in the second example in Using a Remote Manager.
The following code can be started with argument server to start up the remote manager, workers to start up a process pool of 3 processes where each process reads from the work_tasks_queue
expecting an integer and writes a tuple to the task_done_queue
queue consisting of the integer and the square of the integer as the result or with no argument to start the client that writes 10 integers to the work_tasks_queue
queue and then reads the 10 results from the task_done_queue
queue, which may be in arbitrary order.
There seems to be a bug with authentication and it becomes necessary for each process in the process pool to initialize its own process as follows or the manager will reject requests:
current_process().authkey = password.encode('utf-8')
Needless to say, the server, workers and client would typically (or at least, possibly) be run on 3 different machines (with an adjustment to the address
specification).
Common QueueManager.py Module
from multiprocessing.managers import BaseManager
address = "127.0.0.1"
port = 50000
password = "secret"
class QueueManager(BaseManager):
pass
def connect_to_manager():
QueueManager.register('work_tasks_queue')
QueueManager.register('done_task_queue')
manager = QueueManager(address=(address, port), authkey=password.encode('utf-8'))
manager.connect()
return manager.work_tasks_queue(), manager.done_task_queue()
server.py
from QueueManager import *
from queue import Queue
work_tasks_queue = Queue()
done_task_queue = Queue()
def get_work_tasks_queue():
return work_tasks_queue
def get_done_task_queue():
return done_task_queue
def server():
# Don't seem to be able to use a lambda or nested function when using net_manager.start():
QueueManager.register('work_tasks_queue', callable=get_work_tasks_queue)
QueueManager.register('done_task_queue', callable=get_done_task_queue)
net_manager = QueueManager(address=(address, port), authkey=password.encode('utf-8'))
net_manager.start()
input('Server starting. Hit Enter to terminate....')
net_manager.shutdown()
if __name__ == '__main__':
server()
workers.py
from QueueManager import *
from multiprocessing import Process, current_process, cpu_count
from threading import Thread
def worker(in_q, out_q):
current_process().authkey = password.encode('utf-8')
while True:
x = in_q.get()
if x is None: # signal to terminate
in_q.task_done()
break
out_q.put((x, x ** 2))
in_q.task_done()
def create_workers(in_q, out_q, n_workers):
processes = [Process(target=worker, args=(in_q, out_q)) for _ in range(n_workers)]
for process in processes:
process.start()
for process in processes:
process.join()
def start_workers():
N_WORKERS = cpu_count()
in_q, out_q = connect_to_manager()
t = Thread(target=create_workers, args=(in_q, out_q, N_WORKERS))
t.start()
input('Starting workers. Hit enter to terminate...')
for _ in range(N_WORKERS):
in_q.put(None) # tell worker to quit
#in_q.join() # not strictly necessary; assumes client's work has been completed too
t.join()
if __name__ == '__main__':
start_workers()
client.py
from QueueManager import *
def client():
in_q, out_q = connect_to_manager()
for x in range(1, 10):
in_q.put(x)
# get results as they become available:
for x in range(1, 10):
x, result = out_q.get()
print(x, result)
if __name__ == '__main__':
client()
Prints:
1 1
4 16
3 9
2 4
5 25
6 36
8 64
7 49
9 81
Update
Here is code to run everything all together.
from QueueManager import *
from workers import create_workers
from client import client
from queue import Queue
from threading import Thread, Event
# So that queues are not unnecessarily created by worker processes under Windows:
work_tasks_queue = None
done_task_queue = None
def get_work_tasks_queue():
global work_tasks_queue
# singleton:
if work_tasks_queue is None:
work_tasks_queue = Queue()
return work_tasks_queue
def get_done_task_queue():
global done_task_queue
# singleton:
if done_task_queue is None:
done_task_queue = Queue()
return done_task_queue
def server(started_event, shutdown_event):
# Don't seem to be able to use a lambda or nested function when using net_manager.start():
QueueManager.register('work_tasks_queue', callable=get_work_tasks_queue)
QueueManager.register('done_task_queue', callable=get_done_task_queue)
net_manager = QueueManager(address=(address, port), authkey=password.encode('utf-8'))
net_manager.start()
started_event.set() # tell main thread that we have started
shutdown_event.wait() # wait to be told to shutdown
net_manager.shutdown()
if __name__ == '__main__':
started_event = Event()
shutdown_event = Event()
server_thread = Thread(target=server, args=(started_event, shutdown_event,))
server_thread.start()
# wait for manager to start:
started_event.wait()
in_q, out_q = connect_to_manager()
N_WORKERS = 3
workers_thread = Thread(target=create_workers, args=(in_q, out_q, N_WORKERS,))
workers_thread.start()
client()
# tell workers we are through:
for _ in range(N_WORKERS):
in_q.put(None)
#in_q.join() # not strictly necessary; assumes client's work has been completed too
workers_thread.join()
# tell manager we are through:
shutdown_event.set()
server_thread.join()