0

I am trying to terminate a worker thread if it exceeds a certain time. The problem that I am facing is that the thread continues even after it is terminated. I check that it is terminated by isAlive and actually deleting the reference entirely - but the thread continues after.

Attached is the code snippet and actual output vs. what I was expecting. I have removed my actual worker code and replaced it with a sleep to easily demonstrate this and set the worker thread for 10 seconds and the time guard for the thread to finish is 5 seconds.

Thanks

from threading import Thread, Timer
import Queue
from time import strftime, gmtime, sleep

class test(object):

    worker_thread = None
    q = Queue.Queue()

    def terminate_test(self):

        if self.worker_thread.isAlive():
            print 'stopping', self.worker_thread.getName(), strftime("%Y-%m-%d %H:%M:%S", gmtime())
            self.worker_thread._Thread__stop()
            self.worker_thread.join()
            print "Thread alive?", self.worker_thread.getName(), self.worker_thread.isAlive()
            del self.worker_thread
            print 'stopped'
    #            self._testq.task_done()

        else:
            print 'worker finished before stopping', strftime("%Y-%m-%d %H:%M:%S", gmtime())    

    def worker(self, item):
        k = item.keys()[0]
        v = item.values()[0]
        print 'starting', k, strftime("%Y-%m-%d %H:%M:%S", gmtime())
        print 'sleeping', v, strftime("%Y-%m-%d %H:%M:%S", gmtime())
        sleep(v)
        print "waking up", k,v, strftime("%Y-%m-%d %H:%M:%S", gmtime())
        self.q.task_done()
        self.test_terminate_thread.cancel()

    def start(self):
        source = [{'test a':10},{'test b':10},{'test c':10},{'test d':10}]
        for item in source:
            self.q.put(item)

        while not self.q.empty():
            item = self.q.get()
            self.worker_thread = Thread(target=self.worker, args=(item,))
            self.worker_thread.setName('test_thread %s' % item.keys()[0])

            self.test_terminate_thread = Timer(5, self.terminate_test)
            self.test_terminate_thread.setName("Test_terminate_thread")

            self.worker_thread.start()
            self.test_terminate_thread.start()

            self.worker_thread.join()    
            self.test_terminate_thread.join()

        print "joining Q", strftime("%Y-%m-%d %H:%M:%S", gmtime())        
        self.q.join()       

if __name__ == "__main__":
    test().start()

Actual Output: (I am not expecting the waking up lines)

starting test a 2015-01-16 15:07:07
sleeping 10 2015-01-16 15:07:07
stopping test_thread test a 2015-01-16 15:07:12
Thread alive? test_thread test a False
stopped
starting test b 2015-01-16 15:07:12
sleeping 10 2015-01-16 15:07:12
waking up test a 10 2015-01-16 15:07:17
waking up test b 10 2015-01-16 15:07:22
starting test c 2015-01-16 15:07:22
sleeping 10 2015-01-16 15:07:22
stopping test_thread test c 2015-01-16 15:07:27
Thread alive? test_thread test c False
stopped
starting test d 2015-01-16 15:07:27
sleeping 10 2015-01-16 15:07:27
stopping test_thread test d 2015-01-16 15:07:32
Thread alive? test_thread test d False
stopped
joining Q 2015-01-16 15:07:32
waking up test c 10 2015-01-16 15:07:32
waking up test d 10 2015-01-16 15:07:37

What I was expecting

starting test a 2015-01-16 15:07:07
sleeping 10 2015-01-16 15:07:07
stopping test_thread test a 2015-01-16 15:07:12
Thread alive? test_thread test a False
stopped
starting test b 2015-01-16 15:07:12
sleeping 10 2015-01-16 15:07:12
starting test c 2015-01-16 15:07:22
sleeping 10 2015-01-16 15:07:22
stopping test_thread test c 2015-01-16 15:07:27
Thread alive? test_thread test c False
stopped
starting test d 2015-01-16 15:07:27
sleeping 10 2015-01-16 15:07:27
stopping test_thread test d 2015-01-16 15:07:32
Thread alive? test_thread test d False
stopped
joining Q 2015-01-16 15:07:32
  • 1
    That's not the proper way to stop a thread. This might help: http://stackoverflow.com/questions/323972/is-there-any-way-to-kill-a-thread-in-python – user3467349 Jan 16 '15 at 15:24

1 Answers1

0

The _Thread__stop() function you're calling doesn't actually terminate a thread. It's an internal method that gets called by Python when a thread's run method ends - meaning it only gets called after a thread actually has stopped. All it actually does is set a __stopped attribute to True and notify anything waiting on the thread to stop that they can continue:

def __stop(self):
    # DummyThreads delete self.__block, but they have no waiters to
    # notify anyway (join() is forbidden on them).
    if not hasattr(self, '_Thread__block'):
        return
    self.__block.acquire()
    self.__stopped = True
    self.__block.notify_all()
    self.__block.release()

Additionally, the is_alive()/isAlive() method just checks the __stopped flag, it doesn't actually verify that the run method has finished, since it assumes __stopped will only be set if that's true.

In general, the right way to abort a thread is to have it work co-operatively with your main thread in some way - like having the main thread set a threading.Event to indicate its time to abort, which the worker occasionally checks. Exactly which approach will work best for you is dependent on your specific use-case, however.

Here's how you could change your example to utilize Event:

def terminate_test(self, event):

    if self.worker_thread.isAlive():
        print 'stopping', self.worker_thread.getName(), strftime("%Y-%m-%d %H:%M:%S", gmtime())
        event.set()
        self.worker_thread.join()
        print "Thread alive?", self.worker_thread.getName(), self.worker_thread.isAlive()
        del self.worker_thread
        print 'stopped'

    else:
        print 'worker finished before stopping', strftime("%Y-%m-%d %H:%M:%S", gmtime())    

def worker(self, item, event):
    k = item.keys()[0]
    v = item.values()[0]
    print 'starting', k, strftime("%Y-%m-%d %H:%M:%S", gmtime())
    print 'sleeping', v, strftime("%Y-%m-%d %H:%M:%S", gmtime())
    if event.wait(v):
        return
    print "waking up", k,v, strftime("%Y-%m-%d %H:%M:%S", gmtime())
    self.q.task_done()
    self.test_terminate_thread.cancel()

def start(self):
    source = [{'test a':10},{'test b':10},{'test c':10},{'test d':10}]
    for item in source:
        self.q.put(item)

    while not self.q.empty():
        item = self.q.get()
        e = Event()
        self.worker_thread = Thread(target=self.worker, args=(item, e))
        self.worker_thread.setName('test_thread %s' % item.keys()[0])

        self.test_terminate_thread = Timer(5, self.terminate_test, args=(e,))
        self.test_terminate_thread.setName("Test_terminate_thread")

Output:

starting test a 2015-01-16 15:42:57
sleeping 10 2015-01-16 15:42:57
stopping test_thread test a 2015-01-16 15:43:02
Thread alive? test_thread test a False
stopped
starting test b 2015-01-16 15:43:02
sleeping 10 2015-01-16 15:43:02
stopping test_thread test b 2015-01-16 15:43:07
Thread alive? test_thread test b False
stopped
starting test c 2015-01-16 15:43:07
sleeping 10 2015-01-16 15:43:07
stopping test_thread test c 2015-01-16 15:43:12
Thread alive? test_thread test c False
stopped
starting test d 2015-01-16 15:43:12
sleeping 10 2015-01-16 15:43:12
stopping test_thread test d 2015-01-16 15:43:17
Thread alive? test_thread test d False
stopped
joining Q 2015-01-16 15:43:17
dano
  • 91,354
  • 19
  • 222
  • 219