I'm interested in instantiating a pool of workers, the_pool
, using multiprocessing.Pool
that uses a Queue
for communication. However, each worker has an argument, role
, that is unique to that worker and needs to be provided during worker initialization. This constraint is imposed by an API I'm interfacing with, and so cannot be worked around. If I didn't need a Queue, I could just iterate over a list of role
values and invoke apply_async
, like so:
[the_pool.apply_async(worker_main, role) for role in roles]
Unfortunately, Queue
object can only be passed to pools during pool instantiation, as in:
the_pool = multiprocessing.Pool(3, worker_main, (the_queue,))
Attempting to pass a Queue
via the arguments to apply_async
causes a runtime error. In the following example, adapted from this question, we attempt to instantiate a pool of three workers. But the example fails, because there is no way to get a role element from roles
into the initargs
for the pool.
import os
import time
import multiprocessing
# A dummy function representing some fixed functionality.
def do_something(x):
print('I got a thing:', x)
# A main function, run by our workers. (Remove role arg for working example)
def worker_main(queue, role):
print('The worker at', os.getpid(), 'has role', role, ' and is initialized.')
# Use role in some way. (Comment out for working example)
do_something(role)
while True:
# Block until something is in the queue.
item = queue.get(True)
print(item)
time.sleep(0.5)
if __name__ == '__main__':
# Define some roles for our workers.
roles = [1, 2, 3]
# Instantiate a Queue for communication.
the_queue = multiprocessing.Queue()
# Build a Pool of workers, each running worker_main.
# PROBLEM: Next line breaks - how do I pass one element of roles to each worker?
the_pool = multiprocessing.Pool(3, worker_main, (the_queue,))
# Iterate, sending data via the Queue.
[the_queue.put('Insert useful message here') for _ in range(5)]
worker_pool.close()
worker_pool.join()
time.sleep(10)
One trivial work-around is to include a second Queue
in initargs
which only serves to communicate the role of each worker, and block the workers execution until it receives a role via that queue. This, however, introduces an additional queue that should not be necessary. Relevant documentation is here. Guidance and advice is greatly appreciated.