I am working on a larger project, where I have 2 Threads (same process) and one separate process. One of the threads is the gui, the other thread is a sentinel thread, observing the subprocess, and the subprocess is doing some heavy lifting with neural networks. The architecture looks somewhat like this:
I need to be able to cancel the process of the neural network and respectively end the sentinel thread. I have created a small example which shows the architecture generally and what I approach to do.
from multiprocessing import Process, Queue
from threading import Thread
from time import sleep
class Worker(Process):
# The worker resembles the neural network. It does some calculations and shares
# the information via the queue.
def __init__(self, queue: Queue):
Process.__init__(self)
self.queue = queue
def run(self):
i = 0
while True:
self.queue.put(i)
i += 1
def stop(self):
# I used the stop function for trying out some things, like using a joinable
# queue and block execution as long as the queue is not empty, which is not
# working
self.queue.put(None)
self.terminate()
class Listener(Thread):
# This class resembles the sentinel thread. It checks in an infinite loop for
# messages. In the real application I send signals via the signals and slots
# design pattern to the gui and display the sent information.
def __init__(self):
Thread.__init__(self)
self.queue = Queue()
self.worker = Worker(self.queue)
def run(self):
self.worker.start()
while True:
data = self.queue.get()
if data is not None:
print(data)
else:
break
print("broken")
def stop(self):
self.worker.stop()
class System:
# This class resembles the gui
def __init__(self):
self.listener = Listener()
def start(self):
self.listener.start()
def stop(self):
self.listener.stop()
if __name__ == "__main__":
system = System()
system.start()
sleep(0.1)
system.stop()
What is the problem?
As long as a process reads or writes to the queue, and/or the queue is not emptied properly, one or both of the processes become zombie processes, which basically is a deadlock in some sense. Therefore I need to find a way to properly handle the queue when terminating the process, thus the processes terminate without errors.
What I have tried so far:
Using a Joinable Queue and join() for each task_done()
Rewriting the SIGTERM signalhandler to wait the queue to be emptied
Using a Joinable Queue and only join() within the SIGTERM signalhandler
The results:
The speed of the processing collapsed greatly, but termination worked properly
and 3. termination does not work the way I implemented it Sometimes it worked, sometimes it did not. So no reliable output and knowledge from this method
An attempt for (3) is the following:
class Worker(Process):
def __init__(self, queue: Queue):
Process.__init__(self)
self.queue = queue
self.abort = False
self.lock = Lock()
signal(SIGTERM, self.stop)
def run(self):
i = 0
while True:
self.lock.acquire()
if self.abort:
break
else:
self.queue.put(i)
i += 1
self.lock.release()
exit(0)
def stop(self, sig, frame):
self.abort = True
self.queue.put(None)
self.queue.join()
exit(0)