0

How can I make sure that all processes have finished before I start collecting data from queues? without the join() function, the main program attempts to start collecting from the queue straight away before they have finished, but with the join function, the next part never happens as all the processes are left hanging. This has been mentioned before but I have yet to find a way to get the desired behaviour

import multiprocessing as mp

def foo(q1, q2, data):
    # do some stuff
    q1.put(var1)
    q2.put(var2)

def main_function(list_of_stuff):
    q1 = mp.Queue()
    q2 = mp.Queue()
    jobs = []
    
    for data in list_of_stuff:
        p = mp.Process(target=foo, args=(q1, q2, data))
        p.start()
        jobs.append(p)
        
    # wait for all processes to end before continuing
    for job in jobs:
        job.join()
    
    list1 = []
    while not q1.empty():
        new = q1.get()
        list1.append(new)
        
    list2 = []
    while not q2.empty():
        new = q2.get()
        list2.append(new)
    
    return  list1, list2
aloea
  • 191
  • 1
  • 12
  • Relevant: https://stackoverflow.com/q/73225062/16310741 – Charchit Agarwal Aug 09 '22 at 17:26
  • Your code as posted works for me. You seem to be saying that you want to do without the `job.join()` loop. What advantages are you after by removing that? – quamrana Aug 09 '22 at 19:55
  • have you considered using `multiprocessing.Pool` its `map` function would seem to do something similar but take care of the awkward synchronization work – Sam Mason Aug 10 '22 at 12:20

1 Answers1

0

First, var1 and var2 are not defined so I have modified this function to do something (somewhat) useful.

Second, if you read the documentation on multiprocessing.Queue, especially the warnings, you will see that you can get in a deadlock if you attempt to do get calls against a queue after you join a process that has been doing put calls to the queue because the writing process must still be running for you to successfully get all the items it has placed on the queue. You will also see that calls to empty on a multiprocessing queue are not reliable.

So the simplest change for you to make, since you know how many items are being put to each queue would be:

import multiprocessing as mp

def foo(q1, q2, data):
    # do some stuff
    q1.put(data ** 2)
    q2.put(data ** 3)

def main_function(list_of_stuff):
    q1 = mp.Queue()
    q2 = mp.Queue()
    jobs = []

    for data in list_of_stuff:
        p = mp.Process(target=foo, args=(q1, q2, data))
        p.start()
        jobs.append(p)

    list1 = []
    list2 = []
    for _ in range(len(list_of_stuff)):
        new = q1.get()
        list1.append(new)
        new = q2.get()
        list2.append(new)

    # wait for all processes to end before continuing
    for job in jobs:
        job.join()

    return  list1, list2

if __name__ == '__main__':
    list1, list2 = main_function([1, 2, 3])
    print(list1, list2)
``

Prints:

```lang-None
[1, 4, 9] [1, 8, 27]

If your main process did not know how many items the child process would be placing on the output queue (or queues), then you would need to organize your code differently and use sentinel items placed on the queues to signify that there are no more items to be retrieved:

import multiprocessing as mp

def square(q, data):
    # do some stuff
    for x in data:
        q.put(x ** 2)
    # Show no more items on queue will be put:
    q.put(None)

def cube(q, data):
    # do some stuff
    for x in data:
        q.put(x ** 3)
    # Show no more items on queue will be put:
    q.put(None)

def main_function(list_of_stuff):
    q1 = mp.Queue()
    q2 = mp.Queue()

    p1 = mp.Process(target=square, args=(q1, list_of_stuff))
    p2 = mp.Process(target=cube, args=(q2, list_of_stuff))
    p1.start()
    p2.start()

    list1 = []
    while True:
        new = q1.get()
        if new is None:
            # Seen sentinel item:
            break
        list1.append(new)


    list2 = []
    while True:
        new = q2.get()
        if new is None:
            # Seen sentinel item:
            break
        list2.append(new)

    # wait for all processes to end before continuing
    p1.join()
    p2.join()

    return  list1, list2

if __name__ == '__main__':
    list1, list2 = main_function([1, 2, 3])
    print(list1, list2)

Prints:

[1, 4, 9] [1, 8, 27]

If you want to keep your original structure where foo calculates a square and a cube for each data item and you want a separate process for each item in the passed list_of_stuff, then use a multiprocessing pool:

import multiprocessing as mp

def foo(x):
    # do some stuff
    return x ** 2, x ** 3

def main_function(list_of_stuff):
    pool_size = min(mp.cpu_count(), len(list_of_stuff))
    with mp.Pool(pool_size) as pool:
        results = pool.map(foo, list_of_stuff)
        list1 = []
        list2 = []
        for square, cube in results:
            list1.append(square)
            list2.append(cube)

    return  list1, list2

if __name__ == '__main__':
    list1, list2 = main_function([1, 2, 3])
    print(list1, list2)

Note that since the worker function foo is pure CPU, there is no point in making the pool size greater than the number of tasks that will be submitted nor greater than the number of CPU cores you have. By using the map function, you are guaranteed that the results returned are in the same order as the tasks submitted. In your original code there was no guarantee on the order in which results would be written to the queues by the child processes so it would be possible to end up with list1, for example, being [4, 1, 9].

Booboo
  • 38,656
  • 3
  • 37
  • 60