-1

I have an object instance that holds two queues, an input queue and an output queue. The parent process spawns several child processes that work on the queues.

My requirements are:

  • it should be possible to fill the input queue at any time (function fill() in the code below)
  • the child processes should process items from the input queue. If the input queue is empty, they should wait for the input queue to be filled (function echo())
  • items should be read from the output queue if it is not empty (function read())
  • if a shared shutdown flag (attribute self._shutdown) is set to true, all processes should end instead of waiting for queue items

Here is the code I currently have:

Python 2.7 Minimal Example

from multiprocessing import Queue, Process, Value
from ctypes import c_bool
from Queue import Empty

class A(object):

  def __init__(self):
    self._shutdown = Value(c_bool, False)
    self._input_queue = Queue()
    self._output_queue = Queue()

  def echo(self):
    while True:
      if self._shutdown.value == True: break
      try:
        item = self._input_queue.get(True, timeout=1)
      except Empty:
        continue
      print "[echo] read from input qeue: ", item
      print "[echo] put into output queue: ", item*2
      self._output_queue.put(item*2)

  def fill(self):
    for item in xrange(1,6):
      print "[fill] put into input queue: ", item
      self._input_queue.put(item)

  def read(self):
    while True:
      if self._shutdown.value == True: break
      try:
        item = self._output_queue.get(True, timeout=1)
      except Empty:
        continue
      print "[read] got from output queue: ", item

a = A()

p1 = Process(target=a.echo)
p2 = Process(target=a.echo)

p1.start()
p2.start()

a.fill()
a.read()
a._shutdown.value = True

The output of the script above is correct:

[fill] put into input queue:  1
[fill] put into input queue:  2
[fill] put into input queue:  3
[fill] put into input queue:  4
[fill] put into input queue:  5
[echo] read from input qeue:  1
[echo] put into output queue:  2
[echo] read from input qeue:  2
[echo] put into output queue:  4
[echo] read from input qeue:  3
[echo] put into output queue:  6
[read] got from output queue:  2
[echo] read from input qeue:  4
[read] got from output queue:  6
[echo] put into output queue:  8
[echo] read from input qeue:  5
[echo] put into output queue:  10
[read] got from output queue:  8
[read] got from output queue:  4
[read] got from output queue:  10

Except that it deadlocks and the process never finishes. It seems that the processes block each other. My question is:

Why exactly does my code deadlock and what can I do to prevent this?

Mathias Müller
  • 22,203
  • 13
  • 58
  • 75

1 Answers1

3

Your a.read() is synchronous call in main thread which causes a forever while loop until self._shutdown.value is True.You are making it happen on the line under a.read.So the line a._shutdown.value = True is never executing .Then only the changes will reflect inside p1 and p2.So briefly what happening is p1(process 1), p2(process 2) and read()(main process) is running forever.

itzMEonTV
  • 19,851
  • 4
  • 39
  • 49