17

I am aware of multiprocessing.Manager() and how it can be used to create shared objects, in particular queues which can be shared between workers. There is this question, this question, this question and even one of my own questions.

However, I need to define a great many queues, each of which is linking a specific pair of processes. Say that each pair of processes and its linking queue is identified by the variable key.

I want to use a dictionary to access my queues when I need to put and get data. I cannot make this work. I've tried a number of things. With multiprocessing imported as mp:

Defining a dict like for key in all_keys: DICT[key] = mp.Queue in a config file which is imported by the multiprocessing module (call it multi.py) does not return errors, but the queue DICT[key] is not shared between the processes, each one seems to have their own copy of the queue and thus no communication happens.

If I try to define the DICT at the beginning of the main multiprocessing function that defines the processes and starts them, like

DICT = mp.Manager().dict()    
for key in all_keys:
    DICT[key] = mp.Queue()

I get the error

RuntimeError: Queue objects should only be shared between processes through
 inheritance

Changing to

DICT = mp.Manager().dict()    
for key in all_keys:
    DICT[key] = mp.Manager().Queue()

only makes everything worse. Trying similar definitions at the head of multi.py rather than inside the main function returns similar errors.

There must be a way to share many queues between processes without explicitly naming each one in the code. Any ideas?

Edit

Here is a basic schema of the program:

1- load the first module, which defines some variables, imports multi, launches multi.main(), and loads another module which starts a cascade of module loads and code execution. Meanwhile...

2- multi.main looks like this:

def main():
    manager = mp.Manager()
    pool = mp.Pool()
    DICT2 = manager.dict()

    for key in all_keys:
        DICT2[key] = manager.Queue()
        proc_1 = pool.apply_async(targ1,(DICT1[key],) ) #DICT1 is defined in the config file
        proc_2 =  pool.apply_async(targ2,(DICT2[key], otherargs,) 

Rather than use pool and manager, I was also launching processes with the following:

mp.Process(target=targ1, args=(DICT[key],))

3 - The function targ1 takes input data that is coming in (sorted by key) from the main process. It is meant to pass the result to DICT[key] so targ2 can do its work. This is the part that is not working. There are an arbitrary number of targ1s, targ2s, etc. and therefore an arbitrary number of queues.

4 - The results of some of these processes will be sent to a bunch of different arrays / pandas dataframes which are also indexed by key, and which I would like to be accessible from arbitrary processes, even ones launched in a different module. I have yet to write this part and it might be a different question. (I mention it here because the answer to 3 above might also solve 4 nicely.)

Community
  • 1
  • 1
Wapiti
  • 1,851
  • 2
  • 20
  • 40
  • How are you launching your child processes? Can you instantiate the `Queue` before launching the processes? Are the pairs of processes you're talking about potentially two child processes, or is it always a parent-child relationship? – dano Apr 30 '15 at 15:23
  • Hey man, I've added an edit that describes my program in brief outline. I am instantiating `Queue` before launching the processes. I am not sure how to distinguish a child from parent process so can't answer your last question... – Wapiti Apr 30 '15 at 16:54
  • Why is `DICT2` a `manager.dict()` in your code above? It doesn't look like you actually try to pass the `DICT2` object to any children. Couldn't it just be a regular dict containing `mp.Manager().Queue()` instances? – dano Apr 30 '15 at 18:54
  • I tried `DICT2={}` earlier (not only in `multi.main()` but everywhere I could think might work), I get the same `RunTime` error. What do you mean 'pass the `DICT2` object to children'? Isn't that what I'm doing by making it an argument of `targ2`? – Wapiti Apr 30 '15 at 19:50
  • Hey, so I think I made some progress, inspired by your second comment. `DICT2[key]` is meant to be read by `targ2`, and written into by a function that `targ1` was calling. But when I put `DICT2[key]` into `targ1` as an argument (beside `DICT1[key]`), the procedure worked and `targ2` was able to read data from the queue `DICT2[key]`. Does this accord with your experience and make sense? Is it what you meant by parent/children processes (`proc_1` is the parent of `proc_2`)? – Wapiti May 01 '15 at 04:51
  • I guess it still don't understand what you're trying to do. What is stored in `DICT1`, exactly? Also, you're not passing either dict to the child processes. When you do `pool.apply_async(targ1,(DICT1[key],)`, you're not passing `DICT1` to the child, you're passing whatever object is stored at `DICT1[key]`. Also, `proc_1` and `proc_2` are siblings; they're both children of your main script. I'm also not sure why `proc_1` needs to pass information to `proc_2`, why not a single process handle *all* the work that needs to be done, and have a bunch of those identical processes running in parallel? – dano May 01 '15 at 14:40
  • Sorry for the delay, I've been rather slammed. I got the code to work by passing `DICT[key]` as arguments to the processes, as I said in my previous comment. My example code is rather obscure because I've tried (with limited success) to extract the essence of my difficulty. The other aspects of the code are not important. I understand the difference between child and parent processes now, very intuitive. Thanks. – Wapiti May 07 '15 at 11:38

1 Answers1

33

It sounds like your issues started when you tried to share a multiprocessing.Queue() by passing it as an argument. You can get around this by creating a managed queue instead:

import multiprocessing
manager = multiprocessing.Manager()
passable_queue = manager.Queue()

When you use a manager to create it, you are storing and passing around a proxy to the queue, rather than the queue itself, so even when the object you pass to your worker processes is a copied, it will still point at the same underlying data structure: your queue. It's very similar (in concept) to pointers in C/C++. If you create your queues this way, you will be able to pass them when you launch a worker process.

Since you can pass queues around now, you no longer need your dictionary to be managed. Keep a normal dictionary in main that will store all the mappings, and only give your worker processes the queues they need, so they won't need access to any mappings.

I've written an example of this here. It looks like you are passing objects between your workers, so that's what's done here. Imagine we have two stages of processing, and the data both starts and ends in the control of main. Look at how we can create the queues that connect the workers like a pipeline, but by giving them only they queues they need, there's no need for them to know about any mappings:

import multiprocessing as mp

def stage1(q_in, q_out):

    q_out.put(q_in.get()+"Stage 1 did some work.\n")
    return

def stage2(q_in, q_out):

    q_out.put(q_in.get()+"Stage 2 did some work.\n")
    return

def main():

    pool = mp.Pool()
    manager = mp.Manager()

    # create managed queues
    q_main_to_s1 = manager.Queue()
    q_s1_to_s2 = manager.Queue()
    q_s2_to_main = manager.Queue()

    # launch workers, passing them the queues they need
    results_s1 = pool.apply_async(stage1, (q_main_to_s1, q_s1_to_s2))
    results_s2 = pool.apply_async(stage2, (q_s1_to_s2, q_s2_to_main))

    # Send a message into the pipeline
    q_main_to_s1.put("Main started the job.\n")

    # Wait for work to complete
    print(q_s2_to_main.get()+"Main finished the job.")

    pool.close()
    pool.join()

    return

if __name__ == "__main__":
    main()

The code produces this output:

Main started the job.
Stage 1 did some work.
Stage 2 did some work.
Main finished the job.

I didn't include an example of storing the queues or AsyncResults objects in dictionaries, because I still don't quite understand how your program is supposed to work. But now that you can pass your queues freely, you can build your dictionary to store the queue/process mappings as needed.

In fact, if you really do build a pipeline between multiple workers, you don't even need to keep a reference to the "inter-worker" queues in main. Create the queues, pass them to your workers, then only retain references to queues that main will use. I would definitely recommend trying to let old queues be garbage collected as quickly as possible if you really do have "an arbitrary number" of queues.

rocksportrocker
  • 7,251
  • 2
  • 31
  • 48
skrrgwasme
  • 9,358
  • 11
  • 54
  • 84
  • Thanks again for the detailed response. This is helpful. I am a bit confused though. My original question was about how to avoid writing out a lot of `mp.Manager()` definitions explicitly, and instead put lots of queues into a dict. @dano helped me resolve what I was doing wrong in the comments above. I must be missing something in your example, because I don't see the point of using a managed queue. If I replace the managed queues with regular ones and take out the manager and pool and just use normal multiprocessing syntax, your example also works. What is the manger adding? – Wapiti May 07 '15 at 11:53
  • @Wapiti The manager allows you to pass Queues as input arguments to asynchronous functions. It resolves your "queues should be inherited" error. If you can explain what you mean when you say they "only made things worse", I can help you resolve it, because using the manager is what you should be doing. Your dict shouldn't be managed because it isn't shared with any processes besides main, and when you launch the processes, pass them both their "in" and "out" queues, and it should work for you. – skrrgwasme May 08 '15 at 20:41
  • @Wapiti Also, you are correct that if you directly use the `multiprocessing.Process` class, you can pass them normal queues. I focused on getting the queues to work for you with a multiprocessing pool because it sounded like that's what you wanted to use. If you want to stick with the pools, they need to be managed queues to pass them as arguments. – skrrgwasme May 08 '15 at 21:13
  • Okay, that clarifies things. I had switched back to using regular `mp.Process()` since `pool` was giving me so much trouble. One thing I did notice is that if I create a shared dict with the manager it winds up really slowing down my app. I didn't time it but the slowdown might be a factor of 1000 or more. Is this to be expected when, say, 10 processes are all writing to a shared dict roughly 10 times per second? – Wapiti May 09 '15 at 13:24
  • Short answer: yes. Given that many writes in that short a period of time, massive slowdowns shouldn't be a surprise. Managed dicts can support concurrent reads, but once a write is happening, everyone has to wait, including both readers and writers. However, in the code you've shown us, you're not actually passing the managed dict - you're passing the queues within it. Normally even managed dicts aren't capable of detecting when their mutable objects are being modified and I don't know how it would treat queues and their proxies. It's pretty complex, so I'm going to do some more exploring. – skrrgwasme May 09 '15 at 21:47
  • 1
    Thanks. I'm just starting to get my head around multiprocessing. It's wonderful and complicated. Since I posted with code mentioning `pool` and mentioned the managed queues, I'm accepting your answer. Hopefully somebody reading this stuff will get some use out of it. – Wapiti May 10 '15 at 01:29
  • Thank helped resolve a problem I had when passing a dictionary of queue names. – Vitalis Sep 19 '20 at 10:03
  • 1
    Thanks for this example. This helped me a ton! – LampShade Oct 08 '20 at 01:46
  • How come one can pass queue handles through queues to subprocesses but we can't pickle the queue handles to a file and load them from another process directly? – John Allard May 01 '22 at 09:05