0

I've read that it's considered bad practice to kill a thread. (Is there any way to kill a Thread?) There are a LOT of answers there, and I'm wondering if even using a thread in the first place is the right answer for me.

I have a bunch multiprocessing.Processes. Essentially, each Process is doing this:

while some_condition:
    result = self.function_to_execute(i, **kwargs_i)
    # outQ is a multiprocessing.queue shared between all Processes
    self.outQ.put(Result(i, result))

Problem is... I need a way to interrupt function_to_execute, but can't modify the function itself. Initially, I was thinking simply process.terminate(), but that appears to be unsafe with multiprocessing.queue.

Most likely (but not guaranteed), if I need to kill a thread, the 'main' program is going to be done soon. Is my safest option to do something like this? Or perhaps there is a more elegant solution than using a thread in the first place?

def thread_task():
    while some_condition:
        result = self.function_to_execute(i, **kwargs_i)
        if (this_thread_is_not_daemonized):
            self.outQ.put(Result(i, result))

t = Thread(target=thread_task)
t.start()

if end_early:
    t.daemon = True

I believe the end result of this is that the Process that spawned the thread will continue to waste CPU cycles on a task I no longer care about the output for, but if the main program finishes, it'll clean up all my memory nicely.

The main problem with daemonizing a thread is that the main program could potentially continue for 30+ minutes even when I don't care about the output of that thread anymore.

user3534080
  • 1,201
  • 1
  • 14
  • 21
  • Note that a 'thread' and a 'process' are not the same thing - I expect that if you spawn processes to run your work, killing the process would actually be exactly what you need. Any resources associated with it would be cleaned up as the entire process is terminated at the OS level. Look into ways of running your function in its own process instead of just a thread of your main process. – Grismar Aug 13 '19 at 01:47
  • Are you starting threads in separate processes? – wwii Aug 13 '19 at 02:50
  • From the [threading docs](https://docs.python.org/3/library/threading.html) - `If you want your threads to stop gracefully, make them non-daemonic and use a suitable signalling mechanism such as an Event.` – wwii Aug 13 '19 at 02:53
  • @Grismar I've found that if you hit CTRL+C and kill a Python app w/ a sub-Process that has allocated on the GPU, it doesn't always free up GPU resources (I believe the cause is the main process ending and orphaned Processes 'continuing' indefinitely). Anyway, I'm using multiprocessing.queue objects, and they explicitly warn against terminating: `If a process is killed using Process.terminate() or os.kill() while it is trying to use a Queue, then the data in the queue is likely to become corrupted. This may cause any other process to get an exception when it tries to use the queue later on.` – user3534080 Aug 13 '19 at 18:02
  • To answer the other questions: the main process is spawning a number of multiprocessing.Process objects. I was thinking if I had those Process each spawn a thread, I could potentially kill the thread and still have the Process exit safely, or at least clean up after them. @wwii , wouldn't that require me being able to modify `function_to_execute` ? – user3534080 Aug 13 '19 at 18:02
  • If data corruption is the fear, you could write the process so that it checks for a value asking it to kill itself and exit gracefully, instead of killing it. Like explained here https://stackoverflow.com/questions/18499497/how-to-process-sigterm-signal-gracefully – Grismar Aug 15 '19 at 01:51

1 Answers1

0

From the threading docs:

If you want your threads to stop gracefully, make them non-daemonic and use a suitable signalling mechanism such as an Event

Here is a contrived example of what I was thinking - no idea if it mimics what you are doing or can be adapted for your situation. Another caveat: I've never written any real concurrent code.

Create an Event object in the main process and pass it all the way to the thread. Design the thread so that it loops until the Event object is set. Once you don't need the processing anymore SET the Event object in the main process. No need to modify the function being run in the thread.

from multiprocessing import Process, Queue, Event
from threading import Thread
import time, random, os

def f_to_run():
    time.sleep(.2)
    return random.randint(1,10)

class T(Thread):
    def __init__(self, evt,q, func, parent):
        self.evt = evt
        self.q = q
        self.func = func
        self.parent = parent
        super().__init__()
    def run(self):
        while not self.evt.is_set():
            n = self.func()
            self.q.put(f'PID {self.parent}-{self.name}: {n}')

def f(T,evt,q,func):
    pid = os.getpid()
    t = T(evt,q,func,pid)
    t.start()
    t.join()
    q.put(f'PID {pid}-{t.name} is alive - {t.is_alive()}')
    q.put(f'PID {pid}:DONE')
    return 'foo done'

if __name__ == '__main__':
    results = []
    q = Queue()
    evt = Event()
    # two processes each with one thread
    p= Process(target=f, args=(T, evt, q, f_to_run))
    p1 = Process(target=f, args=(T, evt, q, f_to_run))
    p.start()
    p1.start()

    while len(results) < 40:
        results.append(q.get())
        print('.',end='')
    print('')
    evt.set()
    p.join()
    p1.join()
    while not q.empty():
        results.append(q.get_nowait())
    for thing in results:
        print(thing)

I initially tried to use threading.Event but the multiprocessing module complained that it couldn't be pickled. I was actually surprised that the multiprocessing.Queue and multiprocessing.Event worked AND could be accessed by the thread.


Not sure why I started with a Thread subclass - I think I thought it would be easier to control/specify what happens in it's run method. But it can be done with a function also.

from multiprocessing import Process, Queue, Event
from threading import Thread
import time, random

def f_to_run():
    time.sleep(.2)
    return random.randint(1,10)

def t1(evt,q, func):
    while not evt.is_set():
        n = func()
        q.put(n)

def g(t1,evt,q,func):
    t = Thread(target=t1,args=(evt,q,func))
    t.start()
    t.join()
    q.put(f'{t.name} is alive - {t.is_alive()}')
    return 'foo'

if __name__ == '__main__':

    q = Queue()
    evt = Event()
    p= Process(target=g, args=(t1, evt, q, f_to_run))
    p.start()
    time.sleep(5)
    evt.set()
    p.join()
wwii
  • 23,232
  • 7
  • 37
  • 77