0

To accelerate a certain task, I'm subclassing Process to create a worker that will process data coming in samples. Some managing class will feed it data and read the outputs (using two Queue instances). For asynchronous operation I'm using put_nowait and get_nowait. At the end I'm sending a special exit code to my process, upon which it breaks its internal loop. However... it never happens. Here's a minimal reproducible example:

import multiprocessing as mp

class Worker(mp.Process):
  def __init__(self, in_queue, out_queue):
    super(Worker, self).__init__()
    self.input_queue = in_queue
    self.output_queue = out_queue

  def run(self):
    while True:
      received = self.input_queue.get(block=True)
      if received is None:
        break
      self.output_queue.put_nowait(received)
    print("\tWORKER DEAD")


class Processor():
  def __init__(self):
    # prepare
    in_queue = mp.Queue()
    out_queue = mp.Queue()
    worker = Worker(in_queue, out_queue)
    # get to work
    worker.start()
    in_queue.put_nowait(list(range(10**5))) # XXX
    # clean up
    print("NOTIFYING")
    in_queue.put_nowait(None)
    #out_queue.get() # XXX
    print("JOINING")
    worker.join()

Processor()

This code never completes, hanging permanently like this:

NOTIFYING
JOINING
    WORKER DEAD

Why?

I've marked two lines with XXX. In the first one, if I send less data (say, 10**4), everything will finish normally (processes join as expected). Similarly in the second, if I get() after notifying the workers to finish. I know I'm missing something but nothing in the documentation seems relevant.

Przemek D
  • 654
  • 6
  • 26

1 Answers1

1

Documentation mentions that

When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe. This has some consequences [...] After putting an object on an empty queue there may be an infinitesimal delay before the queue’s empty() method returns False and get_nowait() can return without raising queue.Empty.

https://docs.python.org/3.7/library/multiprocessing.html#pipes-and-queues

and additionally that

whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate.

https://docs.python.org/3.7/library/multiprocessing.html#multiprocessing-programming

This means that the behaviour you describe is caused probably by a racing condition between self.output_queue.put_nowait(received) in the worker and joining the worker with worker.join() in the Processers __init__. If joining was faster than feeding it into the queue, everything finishes fine. If it was too slow, there is an item in the queue, and the worker would not join.

Uncommenting the out_queue.get() in the main process would empty the queue, which allows joining. But as it is important for the queue to return if the queue would already be empty, using a time-out might be an option to try to wait out the racing condition, e.g out_qeue.get(timeout=10).

Possibly important might also be to protect the main routine, especially for Windows (python multiprocessing on windows, if __name__ == "__main__")

  • You're asking me to use a `JoinableQueue` instead of just a `Queue`. I tried that but I get `ValueError: task_done() called too many times`. So there's something about queues/pipes that I should understand. Besides, even if your advice worked, I'd still ask: what is the reasoning behind this? What does it mean that "a task was done" and why would I need to do that? And if this is so important, then why is there a queue class without this capability? – Przemek D Jul 08 '19 at 06:58
  • Sorry for the confusion about joining the queues. I was under the impression that multiprocessing is using the same queues as the queue module, which it apparently does not. – Robert Guggenberger Jul 09 '19 at 14:50
  • Maybe this is helpful to better understand mp queues: https://docs.python.org/3.7/library/multiprocessing.html#pipes-and-queues: "if a child process has put items on a queue [...], then that process will not terminate until all buffered items have been flushed to the pipe." – Robert Guggenberger Jul 09 '19 at 16:46
  • also: Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. https://docs.python.org/3.7/library/multiprocessing.html#multiprocessing-programming – Robert Guggenberger Jul 09 '19 at 16:48
  • I'm sorry to say that your answer is not helpful at all. You simply repeated my piece of code, uncommenting a line that I already know solves the problem. My question is not "how" but "why", that's why I put it in bold. Thank you for the comments though, especially the one about "feeder thread" -- this would be itself an answer I would upvote and accept. – Przemek D Jul 19 '19 at 07:45
  • 1
    Hey thanks for the feedback. am currently on the road, but will then wrap this up into a proper answer asap. – Robert Guggenberger Jul 20 '19 at 16:03