1

I am working on a larger project, where I have 2 Threads (same process) and one separate process. One of the threads is the gui, the other thread is a sentinel thread, observing the subprocess, and the subprocess is doing some heavy lifting with neural networks. The architecture looks somewhat like this:

Comunication Architecture

I need to be able to cancel the process of the neural network and respectively end the sentinel thread. I have created a small example which shows the architecture generally and what I approach to do.

from multiprocessing import Process, Queue
from threading import Thread
from time import sleep
 
 
class Worker(Process):
    # The worker resembles the neural network. It does some calculations and shares
    # the information via the queue.
    def __init__(self, queue: Queue):
        Process.__init__(self)
        self.queue = queue
 
    def run(self):
        i = 0
        while True:
            self.queue.put(i)
            i += 1
 
    def stop(self):
        # I used the stop function for trying out some things, like using a joinable 
        # queue and block execution as long as the queue is not empty, which is not 
        # working
        self.queue.put(None)
        self.terminate()
 
 
class Listener(Thread):
    # This class resembles the sentinel thread. It checks in an infinite loop for
    # messages. In the real application I send signals via the signals and slots
    # design pattern to the gui and display the sent information.
 
    def __init__(self):
        Thread.__init__(self)
        self.queue = Queue()
        self.worker = Worker(self.queue)
 
    def run(self):
        self.worker.start()
        while True:
            data = self.queue.get()
            if data is not None:
                print(data)
            else:
                break
        print("broken")
 
    def stop(self):
        self.worker.stop()
 
 
class System:
    # This class resembles the gui
 
    def __init__(self):
        self.listener = Listener()
 
    def start(self):
        self.listener.start()
 
    def stop(self):
        self.listener.stop()
 
 
if __name__ == "__main__":
    system = System()
    system.start()
    sleep(0.1)
    system.stop()

What is the problem?

As long as a process reads or writes to the queue, and/or the queue is not emptied properly, one or both of the processes become zombie processes, which basically is a deadlock in some sense. Therefore I need to find a way to properly handle the queue when terminating the process, thus the processes terminate without errors.

What I have tried so far:

  1. Using a Joinable Queue and join() for each task_done()

  2. Rewriting the SIGTERM signalhandler to wait the queue to be emptied

  3. Using a Joinable Queue and only join() within the SIGTERM signalhandler

The results:

  1. The speed of the processing collapsed greatly, but termination worked properly

  2. and 3. termination does not work the way I implemented it Sometimes it worked, sometimes it did not. So no reliable output and knowledge from this method

An attempt for (3) is the following:

class Worker(Process):
 
    def __init__(self, queue: Queue):
        Process.__init__(self)
        self.queue = queue
        self.abort = False
        self.lock = Lock()
        signal(SIGTERM, self.stop)
 
    def run(self):
        i = 0
        while True:
            self.lock.acquire()
            if self.abort:
                break
            else:
                self.queue.put(i)
                i += 1
            self.lock.release()
        exit(0)
 
    def stop(self, sig, frame):
        self.abort = True
        self.queue.put(None)
        self.queue.join()
        exit(0)
Darkonaut
  • 20,186
  • 7
  • 54
  • 65
  • How about adding a heartbeat mechanism to the system? Make the processes communicate they're up and running every N seconds. Add logic to stop running if a heartbeat hasn't been received since T seconds to both sides. – Cihan Oct 16 '20 at 18:17
  • afaik is the biggest issue the queue. I need the worker process to stop putting messages into the queue and have the sentinel process to clean the queue and get all messages. I can't see yet, how the heartbeat could help with this issue. – Thomas Christopher Davies Oct 16 '20 at 18:19
  • Why doesn't it help again? (1) The worker would stop putting messages to queue if it hasn't received heartbeat from sentinel. (2) Sentinel would clean the queue and get all messages if it hasn't received heartbeat from worker. – Cihan Oct 16 '20 at 18:23
  • What would you suggest on implementing it, if the worker class was not using a main loop for calculations, and instead does long sequential operations? – Thomas Christopher Davies Oct 16 '20 at 18:28

1 Answers1

2

There are multiple approaches possible, but if you aim for a compromise between performance and robustness, I'd suggest you use the signal-handler only to set a .running-flag on the worker and let it be checked with while self.running within worker.run(). After the loop breaks, you send the sentinel-value from the worker. This ensures the sentinel-value is always the last value in the queue and all values are read by the listener. Together this layout allows for a graceful shutdown of the worker, while still avoiding more expensive synchronization to check for an exit-condition.

from multiprocessing import Process, Queue
from functools import partial
from threading import Thread
from time import sleep
import signal


SENTINEL = 'SENTINEL'


def sigterm_handler(signum, frame, worker):
    worker.shutdown()


def register_sigterm(worker):
    global sigterm_handler
    sigterm_handler = partial(sigterm_handler, worker=worker)
    signal.signal(signal.SIGTERM, sigterm_handler)


class Worker(Process):

    def __init__(self, queue: Queue):
        Process.__init__(self)
        self.queue = queue
        self.running = False

    def run(self):
        register_sigterm(self)
        self.running = True
        i = 0
        while self.running:
            self.queue.put(i)
            i += 1
        self.queue.put(SENTINEL)

    def stop(self):  # called by parent
        self.terminate()

    def shutdown(self):  # called by child from signal-handler
        self.running = False


class Listener(Thread):

    def __init__(self):
        Thread.__init__(self)
        self.queue = Queue()
        self.worker = Worker(self.queue)

    def run(self):
        self.worker.start()
        for data in iter(self.queue.get, SENTINEL):
            print(data)

    def stop(self):
        self.worker.stop()
        self.worker.join()


class System:

    def __init__(self):
        self.listener = Listener()

    def start(self):
        self.listener.start()

    def stop(self):
        self.listener.stop()


if __name__ == "__main__":

    system = System()
    system.start()
    sleep(0.1)
    system.stop()

Consider the following experimental.

The idea is to monkey-patch a queue-instance in the child that way, that after receipt of SIGTERM, the next time queue.put() is called, the passed value and a specified sentinel-value is send, queue.close() and sys.exit() are called. This allows for a clean shutdown while avoiding repeated flag-checking.

multiprocessing.Queue() is actually just a method on multiprocessing.context.BaseContext, returning a pre-configured instance of multiprocessing.queues.Queue. To not interfere with it, I went with composition over inheritance. Testing so far implies it works just fine.

stqueue.py

import sys
import time
import signal
from functools import partial
from multiprocessing import current_process as curr_p


def _shutdown(self):
    self._xput = self.put
    self.put = self.final_put


def _final_put(self, obj):
    self._xput(obj)
    self._xput(self._xsentinel)
    self.close()
    sys.exit(0)


def _sigterm_handler(signum, frame, queue):
    print(f"[{time.ctime()}, {curr_p().name}] --- handling signal")
    queue.shutdown()


def register_sigterm_queue(queue, sentinel):
    """Monkey-patch queue-instance to shutdown process
    after next call to `queue.put()` upon receipt of SIGTERM.
    """
    queue._xsentinel = sentinel
    queue.shutdown = _shutdown.__get__(queue)
    queue.final_put = _final_put.__get__(queue)
    global _sigterm_handler
    _sigterm_handler = partial(_sigterm_handler, queue=queue)
    signal.signal(signal.SIGTERM, _sigterm_handler)

main.py

import time
from threading import Thread
import multiprocessing as mp
from multiprocessing import Process, Queue, current_process as curr_p

import numpy as np

from stqueue import register_sigterm_queue


SENTINEL = 'SENTINEL'


class Worker(Process):

    def __init__(self, queue: Queue):
        Process.__init__(self)
        self.queue = queue

    def run(self):
        register_sigterm_queue(self.queue, SENTINEL)  # <<<
        while True:
            print(f"[{time.ctime()}, {curr_p().name}] --- starting numpy")
            r = np.sum(
                np.unique(np.random.randint(0, 2500, 100_000_000))
            )
            print(f"[{time.ctime()}, {curr_p().name}] --- ending numpy")
            self.queue.put(r)

    def stop(self):  # called by parent
        self.terminate()

...


if __name__ == "__main__":

    import logging
    mp.log_to_stderr(logging.DEBUG)

    system = System()
    system.start()
    time.sleep(10)
    print(f"[{time.ctime()}, {curr_p().name}] --- sending signal")
    system.stop()
    print(f"[{time.ctime()}, {curr_p().name}] --- signal send")

Example Output:

[DEBUG/MainProcess] created semlock with handle 140000699432960
[DEBUG/MainProcess] created semlock with handle 140000699428864
[DEBUG/MainProcess] created semlock with handle 140000664752128
[DEBUG/MainProcess] Queue._after_fork()
[Sat Oct 24 21:59:58 2020, Worker-1] --- starting numpy
[DEBUG/Worker-1] recreated blocker with handle 140000699432960
[DEBUG/Worker-1] recreated blocker with handle 140000699428864
[DEBUG/Worker-1] recreated blocker with handle 140000664752128
[DEBUG/Worker-1] Queue._after_fork()
[INFO/Worker-1] child process calling self.run()
[DEBUG/Worker-1] Queue._start_thread()
[DEBUG/Worker-1] doing self._thread.start()
[DEBUG/Worker-1] starting thread to feed data to pipe
[DEBUG/Worker-1] ... done self._thread.start()
[Sat Oct 24 22:00:04 2020, Worker-1] --- ending numpy
[Sat Oct 24 22:00:04 2020, Worker-1] --- starting numpy
3123750
[Sat Oct 24 22:00:08 2020, MainProcess] --- sending signal
[Sat Oct 24 22:00:10 2020, Worker-1] --- handling signal
[DEBUG/Worker-1] telling queue thread to quit
[INFO/Worker-1] process shutting down
[DEBUG/Worker-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Worker-1] running the remaining "atexit" finalizers
[DEBUG/Worker-1] joining queue thread
[DEBUG/Worker-1] feeder thread got sentinel -- exiting
[DEBUG/Worker-1] ... queue thread joined
[INFO/Worker-1] process exiting with exitcode 0
[Sat Oct 24 22:00:10 2020, Worker-1] --- ending numpy
3123750
[Sat Oct 24 22:00:10 2020, MainProcess] --- signal send
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers

Process finished with exit code 0
Darkonaut
  • 20,186
  • 7
  • 54
  • 65
  • Sorry for answering late. It did not quite answer what my problem is. You must consider that there is not such a loop to break. This is just an example for showing some long sequential calculations. Nevertheless I'll upvote you, since you partly answered correctly and I am grateful for your effort <3 – Thomas Christopher Davies Oct 22 '20 at 16:09
  • 1
    @ThomasChristopherDavies If there's no loop, does the worker send results back only once? The wlock on the queue is not recursive, so if your signal-handler tries to write to the queue while your regular code already holds it under the hood, it deadlocks. – Darkonaut Oct 22 '20 at 19:32
  • No the worker is sending thousands of messages to the observer thread. The neural net and image preprocessing is sending constantly messages. It only deadlocks if you terminate either of the processes while any other process is trying to read from or write to the queue. – Thomas Christopher Davies Oct 24 '20 at 09:31
  • 1
    @ThomasChristopherDavies So the neural network is also multi-threaded? Yes, you corrupt queues if you just terminate processes involved, that's not surprising. But it's still not clear to me why you can't let this process check a flag instead after or before it puts something on the queue again. – Darkonaut Oct 24 '20 at 09:42
  • I need to test it out to solve it that way. Since there are really a lot of queue put and get actions, I was aiming for a different solution – Thomas Christopher Davies Oct 24 '20 at 13:24
  • Sorry for checking late in, I was really booked out so I had to delay working on this project. Is there any chance to get in touch with you, speaking about your second solution. It really seems to be what I am looking for, but I have a hard time wrapping my head around what you just did there. – Thomas Christopher Davies Nov 04 '20 at 15:29
  • 1
    @ThomasChristopherDavies If anything is unclear you can ask just here so future readers can benefit, too. – Darkonaut Nov 04 '20 at 16:33
  • 1
    @ThomasChristopherDavies The solution patches the queue so that when SIGTERM is received, all what happens in the handler itself, is that the original `.put()` method gets swapped for the `._final_put()` method. Then the next time your regular code outside the handler calls `.put()`, it really calls `.final_put()`, which then shuts down cleanly by enqueueing the last item+sentinel with the original `.put()` (saved as `.xput()`) and raising `SystemExit`. This avoids interfering with synchronization primitives from within the handler, which can cause the deadlocks. – Darkonaut Nov 04 '20 at 16:33
  • Well there is a lot of code-practices I have never used before. For instance I have no real idea what the `_shutdown.__get__(queue)` really does. Also why the partial function is being used how its being used is not totally clear to me. My python understanding does not seem to be on par with the depth you are working with. I am trying to do some research, but I think your answer has a deep level on python knowledge. Nevertheless, I thank you for your effort! – Thomas Christopher Davies Nov 04 '20 at 17:17
  • 1
    @ThomasChristopherDavies `_shutdown.__get__(queue)` is making the function `_shutdown()` a bound method of the instance of queue, [see](https://stackoverflow.com/q/972/9059420). Had to look that up too again, that's not what you need to do often. [`partial`](https://docs.python.org/3.8/library/functools.html#functools.partial) is just creating a wrapper which fills the `queue`-parameter of the handler with the existing queue instance we are patching, so we have a reference to it from within the handler. – Darkonaut Nov 04 '20 at 17:30