2

I got two processes and in order to do some clean up in case of fatal errors (instead of processes keeping running), I want to remove all remaining tasks en empty the queue (in order to let join() proceed). How can I achieve that (preferably it should be code to apply in both processes, but my code allows the child process to signal the main process of its failure state and instruct main to do the clean up as well)?

I was trying to get a understand it by inspecting the source at: https://github.com/python/cpython/blob/main/Lib/multiprocessing/queues.py

But I got a little bit lost with code like:

...
self._unfinished_tasks._semlock._is_zero():
...
def __init__(self, maxsize=0, *, ctx):
    Queue.__init__(self, maxsize, ctx=ctx)
    self._unfinished_tasks = ctx.Semaphore(0)
...

(also where does the _semlock property comes from?)

For example, what is ctx and it appears not be required as I did not use it in my object creation. Digging further, it may have something to do with (a little bit too mysterious or me)

mp.get_context('spawn')

or

@asynccontextmanager
async def ctx():
yield

I need something like mentioned here by V.E.O (which is quite understandable, but that is only a single process as far as I understand): Clear all items from the queue

Mat90
  • 169
  • 1
  • 9

1 Answers1

1

I came up with the following code (to be tested):

def clearAndDiscardQueue(self):
  try: # cleanup, preferably in the process that is adding to the queue
      while True:
          self.task_queue.get_nowait()
  except Empty:
      pass    
  except ValueError:  # in case of closed
      pass
  self.task_queue.close()
  # theoretically a new item could be placed by the
  # other process by the time the interpreter is on this line,
  # therefore the part above should be run in the process that 
  # fills (put) the queue when it is in its failure state
  # (when the main process fails it should communicate to
  # raise an exception in the child process to run the cleanup
  # so main process' join will work)
  try: # could be one of the processes
      while True:
          self.task_queue.task_done()
  except ValueError:  # too many times called, do not care
#  since all remaining will not be processed due to failure state
      pass  

Else I would need to try understanding code like the following. I think messing with the next code, analogous to calling queue.clear() as in a single process queue, would have serious consequences in terms of race conditions when clearing the buffer/pipe myself somehow.

class Queue(object):
    def __init__(self, maxsize=0, *, ctx):
…
    self._reader, self._writer = connection.Pipe(duplex=False)
…
    def put(self, obj, block=True, timeout=None):
…
    self._buffer.append(obj)  # in case of close() the background thread 
# will quit once it has flushed all buffered data to the pipe.
…

def get(self, block=True, timeout=None):
…
    res = self._recv_bytes()
…
    return _ForkingPickler.loads(res)
…
class JoinableQueue(Queue):
    def __init__(self, maxsize=0, *, ctx):
…
    self._unfinished_tasks = ctx.Semaphore(0)
…
    def task_done(self):
…
    if not self._unfinished_tasks._semlock._is_zero():
…

in which _is_zero() is somehow externally defined (see synchronize.py), like mentioned here: Why doesn't Python's _multiprocessing.SemLock have 'name'?

Mat90
  • 169
  • 1
  • 9