0

I've got the following toy script:

#!/usr/bin/env python3
import multiprocessing as mp


def main():
    queue = mp.Queue()
    stop = mp.Event()
    workers = []
    n = mp.cpu_count()
    print(f"starting {n} processes")
    for i in range(n):
        p = mp.Process(target=work, args=(i, queue, stop))
        workers.append(p)
        p.start()
    print("getting 1000 items from queue")
    for _ in range(1000):
        queue.get()
    print("poisoning processes")
    stop.set()
    print("joining processes")
    for worker in workers:
        # hangs occassionally if terminate not called
        # worker.terminate()
        worker.join()
    print("closing queue")
    queue.close()
    print("returning")


def work(i, queue, stop):
    while not stop.is_set():
        queue.put("something")
    print(f"exiting process {i}")


if __name__ == "__main__":
    main()

Here is some sample output where it hangs and then I kill it with Ctrl-C:

λ ./mp_template.py
starting 16 processes
getting 1000 items from queue
poisoning processes
joining processes
exiting process 1
exiting process 2
exiting process 0
exiting process 10
exiting process 9
exiting process 7
exiting process 11
exiting process 5
exiting process 12
exiting process 6
exiting process 4
exiting process 13
exiting process 3
exiting process 8
exiting process 14
exiting process 15
^CTraceback (most recent call last):
  File "/data/repos/mse-408/./mp_template.py", line 37, in <module>
    main()
  File "/data/repos/mse-408/./mp_template.py", line 24, in main
    worker.join()
  File "/usr/lib/python3.10/multiprocessing/process.py", line 149, in join
    res = self._popen.wait(timeout)
  File "/usr/lib/python3.10/multiprocessing/popen_fork.py", line 43, in wait
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
  File "/usr/lib/python3.10/multiprocessing/popen_fork.py", line 27, in poll
    pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
Process Process-1:
Traceback (most recent call last):
  File "/usr/lib/python3.10/multiprocessing/process.py", line 317, in _bootstrap
    util._exit_function()
  File "/usr/lib/python3.10/multiprocessing/util.py", line 360, in _exit_function
    _run_finalizers()
  File "/usr/lib/python3.10/multiprocessing/util.py", line 300, in _run_finalizers
    finalizer()
  File "/usr/lib/python3.10/multiprocessing/util.py", line 224, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/usr/lib/python3.10/multiprocessing/queues.py", line 199, in _finalize_join
    thread.join()
  File "/usr/lib/python3.10/threading.py", line 1096, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.10/threading.py", line 1116, in _wait_for_tstate_lock
    if lock.acquire(block, timeout):
KeyboardInterrupt

Why does this occasionally hang and other times not? If I add worker.terminate() for each worker, it always exits. However, I thought that if the worker returned (as it does after stop.set() is called) -- why does it hang?

Dan Jenson
  • 961
  • 7
  • 20
  • I can only guess that because the queue is filled with items very fast, the "queue.put" in some process(es) can't add more items and waits (infinitely) that items are removed. A timeout on the "queue.put" may help. – Michael Butscher Oct 18 '22 at 04:38

2 Answers2

1

It looks like there are two possible deadlocks. First, the docs mention this issue:

Warning As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe. This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed.

Another deadlock can be because the queue fills up (eg, while main is doing print("poisoning processes"), and one or more threads block on queue.put() and never see stop.is_set().

To solve the first issue you could add

queue.cancel_join_thread()

in the work() functions before they return.

To solve the 2nd issue you could use queue.put_nowait() to avoid workers blocking on a full queue.

An alternative solution to both potential deadlocks is to make main() continue reading the queue until all the workers have terminated. But that can't be reliably done in-line, so you would need to start a reaper thread that simply reads the queue until terminated.

craigb
  • 1,081
  • 1
  • 9
0

I believe that @craigb is correct as far as the cause of the problem being that the main process needs to read all the messages from the queue before joining the child processes. One way is to have the child processes, when they see the stop event set, put one additional sentinel item on the queue, which signifies this is the last item that this child process will be putting to the queue. Then when the main process gets n of these, where n is the number of child processes, it knows that the queue is now empty:

import multiprocessing as mp

def main():
    queue = mp.Queue()
    stop = mp.Event()
    workers = []
    n = mp.cpu_count()
    print(f"starting {n} processes")
    for i in range(n):
        p = mp.Process(target=work, args=(i, queue, stop))
        workers.append(p)
        p.start()
    print("getting 1000 items from queue")
    for _ in range(1000):
        queue.get()
    seen_sentinel_count = 0
    print("poisoning processes")
    stop.set()
    # Get everything that is on the queue:
    while seen_sentinel_count < n:
        item = queue.get()
        if item is None: # Sentinel
            seen_sentinel_count += 1
    print("joining processes")
    for worker in workers:
        worker.join()
    print("closing queue")
    queue.close()
    print("returning")


def work(i, queue, stop):
    while not stop.is_set():
        queue.put("something")
    queue.put(None) # Put sentinel
    print(f"exiting process {i}")


if __name__ == "__main__":
    main()

You may be tempted to try instead doing non-blocking get calls on the queue until an Empty exception is raised:

import multiprocessing as mp
from queue import Empty

def main():
    queue = mp.Queue()
    stop = mp.Event()
    workers = []
    n = mp.cpu_count()
    print(f"starting {n} processes")
    for i in range(n):
        p = mp.Process(target=work, args=(i, queue, stop))
        workers.append(p)
        p.start()
    print("getting 1000 items from queue")
    for _ in range(1000):
        queue.get()
    print("joining processes")
    stop.set()
    # Race condition:
    try:
        queue.get_nowait()
    except Empty:
        pass
    for worker in workers:
        worker.join()
    print("closing queue")
    queue.close()
    print("returning")


def work(i, queue, stop):
    while not stop.is_set():
        # The the even set just as we are about to to put the
        # next message:
        queue.put("something")
    print(f"exiting process {i}")


if __name__ == "__main__":
    main()

But I believe there is a potential race condition in that after the main process calls stop.set() and immediately starts doing non-blocking get calls, there is a possibility that it could get an empty condition yet a child process has not yet seen the event set and is about to do another put.

Booboo
  • 38,656
  • 3
  • 37
  • 60
  • Nice alternative solution! Yes, as you note your second example still has a race condition. – craigb Oct 21 '22 at 18:57