1

I'm interested in instantiating a pool of workers, the_pool, using multiprocessing.Pool that uses a Queue for communication. However, each worker has an argument, role, that is unique to that worker and needs to be provided during worker initialization. This constraint is imposed by an API I'm interfacing with, and so cannot be worked around. If I didn't need a Queue, I could just iterate over a list of role values and invoke apply_async, like so:

[the_pool.apply_async(worker_main, role) for role in roles]

Unfortunately, Queue object can only be passed to pools during pool instantiation, as in:

the_pool = multiprocessing.Pool(3, worker_main, (the_queue,))

Attempting to pass a Queue via the arguments to apply_async causes a runtime error. In the following example, adapted from this question, we attempt to instantiate a pool of three workers. But the example fails, because there is no way to get a role element from roles into the initargs for the pool.

import os
import time
import multiprocessing

# A dummy function representing some fixed functionality.
def do_something(x):
    print('I got a thing:', x)

# A main function, run by our workers. (Remove role arg for working example)
def worker_main(queue, role):

    print('The worker at', os.getpid(), 'has role', role, ' and is initialized.')

    # Use role in some way. (Comment out for working example)
    do_something(role)

    while True:
        # Block until something is in the queue.
        item = queue.get(True)
        print(item)
        time.sleep(0.5)

if __name__ == '__main__':

    # Define some roles for our workers.
    roles = [1, 2, 3]

    # Instantiate a Queue for communication.
    the_queue = multiprocessing.Queue()

    # Build a Pool of workers, each running worker_main.
    # PROBLEM: Next line breaks - how do I pass one element of roles to each worker?
    the_pool = multiprocessing.Pool(3, worker_main, (the_queue,))

    # Iterate, sending data via the Queue.
    [the_queue.put('Insert useful message here') for _ in range(5)]

    worker_pool.close()
    worker_pool.join()
    time.sleep(10)

One trivial work-around is to include a second Queue in initargs which only serves to communicate the role of each worker, and block the workers execution until it receives a role via that queue. This, however, introduces an additional queue that should not be necessary. Relevant documentation is here. Guidance and advice is greatly appreciated.

Community
  • 1
  • 1
Justin Fletcher
  • 2,319
  • 2
  • 17
  • 32
  • 1
    After considering your question and Tim Peters's answer, I feel as though there's a conflict between starting a worker pool of 3 workers, and wanting to delegate roles to the workers when they begin. It seems like you'd either want a pool per role, or would want the role to be passed as part of the queue item so that each worker could take any role. – erik258 Jan 02 '17 at 23:48
  • Justin -- In case you're not aware, note that getting an answer to a Python question from Tim Peters is on a par to getting it from Guido. Meditate on it. – Peter Rowell Jan 03 '17 at 00:23
  • @DanFarrell, you're right - I do want the worker processes to be decoupled from any particular choice of `role`. Using one pool per role won't do the trick, because I need the pool to be shared across roles for architectural reasons. I'm currently passing the role value via a separate`Queue` object, as you suggested, as a work-around. I block the execution of the workers 'init' method until a 'role' value is received, and then fall into `main` immediately thereafter. I'm hoping someone has developed a pythonic idiom to avoid that superfluous queue/blocking arrangement. – Justin Fletcher Jan 03 '17 at 00:52

2 Answers2

3

Why not use two worker functions, one for just for initialization? Like:

def worker_init(q):
     global queue
     queue = q

def worker_main(role):
     # use the global `queue` freely here

Initialization is much the same as what you showed, except call worker_init:

the_pool = multiprocessing.Pool(3, worker_init, (the_queue,))

Initialization is done exactly once per worker process, and each process persists until the Pool terminates. To get work done, do exactly what you wanted to do:

[the_pool.apply_async(worker_main, role) for role in roles]

There's no need to pass the_queue too - each worker process already learned about it during initialization.

Tim Peters
  • 67,464
  • 13
  • 126
  • 132
  • In this case you only get one worker per role, correct? Granted in the question there's an equal number of roles as workers. But if there were say 10 workers desired, and 3 roles, this would only run the queue runner ( presumably within `worker_main` once each for each of the 3 roles, right? – erik258 Jan 02 '17 at 22:56
  • If `roles` remains `[1, 2, 3]`, yes - the code does exactly what you tell it to do ;-) Lacking telepathy, I can't guess what you _want_ 10 workers to do. If, e.g., you want roles 1 and 2 to be taken by 3 workers each, and role 3 to be taken by 4 workers, use some permutation of `[1, 1, 1, 2, 2, 2, 3, 3, 3, 3]` for `roles`. – Tim Peters Jan 02 '17 at 23:07
  • Yes, there will only be one worker per role and each role will be a unique integer in practice. In hindsight, I should have made my question more general. I hadn't considered that I was imposing implementation-specific constraints. In general, I was looking for an idiom that will allow me to spawn `n` workers using a list of arguments, `my_list`, where `len(my_list)` yields `n`. In my actual implementation, I'm encapsulating the worker functionality in objects, so it is advantageous to avoid calling two different functions, if possible. – Justin Fletcher Jan 03 '17 at 00:46
  • I don't grasp why adding objects to the mix would make any difference. At base, you're trying to teach _all_ workers about a single thing, which just happens to be a `Queue` in this case. That's why there _is_ an initialization facility for pools: to set up one-time shared arguments. In any case, since I can't grasp why adding objects to the mix would matter, there's no more I can say. Except that maybe `Pool` is a wrong thing to be using to begin with. Why do you think it's desirable - as opposed to using `multiprocessing.Process`? The latter would let you pass any mix of arguments. – Tim Peters Jan 03 '17 at 01:04
  • As I've constructed it, the object-based approach uses the object `__init__` method in a way analogous to the `worker_main` above. So, it would be conceptually simplest if the objects instantiation could be done in a single call, in which a worker process is launched. What I've learned from this conversation is that the `Pool` instantiation is intended for universal communication to the worker processes, so the idiom I was looking for might not exist. – Justin Fletcher Jan 03 '17 at 02:11
  • More fundamentally, it's unnatural for worker processes in a `Pool` to be distinguishable at all - all the `Pool` methods are geared toward problems where it makes no difference at all which processes get which pieces of work. It's also unnatural for a `Pool` worker to run an "infinite loop". Those things (infinite loops, and different processes doing different kinds of work) are what `Process` is for. `for role in roles: procs.append(Process(target=MyObject, args=(the_queue, role)))` – Tim Peters Jan 03 '17 at 02:25
0

You can just create queue with roles:

import os
import time
import multiprocessing

# A dummy function representing some fixed functionality.
def do_something(x):
    print('I got a thing:', x)

# A main function, run by our workers. (Remove role arg for working example)
def worker_main(queue, roles):
    role = roles.get()
    print('The worker at', os.getpid(), 'has role', role, ' and is initialized.')

    # Use role in some way. (Comment out for working example)
    do_something(role)

    while True:
        # Block until something is in the queue.
        item = queue.get(True)
        print(item)
        time.sleep(0.5)

if __name__ == '__main__':

    # Define some roles for our workers.
    roles = [1, 2, 3]

    # Instantiate a Queue for communication.
    the_queue = multiprocessing.Queue()
    roles_queue = multiprocessing.Queue()
    for role in roles:
        roles_queue.put(role)


    # Build a Pool of workers, each running worker_main.
    # PROBLEM: Next line breaks - how do I pass one element of roles to each worker?
    the_pool = multiprocessing.Pool(3, worker_main, (the_queue, roles_queue))

    # Iterate, sending data via the Queue.
    [the_queue.put('Insert useful message here') for _ in range(5)]

    worker_pool.close()
    worker_pool.join()
    time.sleep(10)
ADR
  • 1,255
  • 9
  • 20
  • Yeah, I have done this as a work-around, as mentioned in the question. Specifically, I'm looking for a python idiom that avoids using an extra queue, which it turns out may not be possible. See the comments to Tim's answer. – Justin Fletcher Jan 03 '17 at 02:16