I am aware of multiprocessing.Manager()
and how it can be used to create shared objects, in particular queues which can be shared between workers. There is this question, this question, this question and even one of my own questions.
However, I need to define a great many queues, each of which is linking a specific pair of processes. Say that each pair of processes and its linking queue is identified by the variable key
.
I want to use a dictionary to access my queues when I need to put and get data. I cannot make this work. I've tried a number of things. With multiprocessing
imported as mp
:
Defining a dict like for key in all_keys: DICT[key] = mp.Queue
in a config file which is imported by the multiprocessing module (call it multi.py
) does not return errors, but the queue DICT[key]
is not shared between the processes, each one seems to have their own copy of the queue and thus no communication happens.
If I try to define the DICT
at the beginning of the main multiprocessing function that defines the processes and starts them, like
DICT = mp.Manager().dict()
for key in all_keys:
DICT[key] = mp.Queue()
I get the error
RuntimeError: Queue objects should only be shared between processes through
inheritance
Changing to
DICT = mp.Manager().dict()
for key in all_keys:
DICT[key] = mp.Manager().Queue()
only makes everything worse. Trying similar definitions at the head of multi.py
rather than inside the main function returns similar errors.
There must be a way to share many queues between processes without explicitly naming each one in the code. Any ideas?
Edit
Here is a basic schema of the program:
1- load the first module, which defines some variables, imports multi
, launches multi.main()
, and loads another module which starts a cascade of module loads and code execution. Meanwhile...
2- multi.main
looks like this:
def main():
manager = mp.Manager()
pool = mp.Pool()
DICT2 = manager.dict()
for key in all_keys:
DICT2[key] = manager.Queue()
proc_1 = pool.apply_async(targ1,(DICT1[key],) ) #DICT1 is defined in the config file
proc_2 = pool.apply_async(targ2,(DICT2[key], otherargs,)
Rather than use pool
and manager
, I was also launching processes with the following:
mp.Process(target=targ1, args=(DICT[key],))
3 - The function targ1
takes input data that is coming in (sorted by key
) from the main process. It is meant to pass the result to DICT[key]
so targ2
can do its work. This is the part that is not working. There are an arbitrary number of targ1
s, targ2
s, etc. and therefore an arbitrary number of queues.
4 - The results of some of these processes will be sent to a bunch of different arrays / pandas dataframes which are also indexed by key
, and which I would like to be accessible from arbitrary processes, even ones launched in a different module. I have yet to write this part and it might be a different question. (I mention it here because the answer to 3 above might also solve 4 nicely.)