I am playing with a threading test code derived from here (or here) but I can't figure what do I miss in my test code. I am running the thread in a separate file, if this makes any difference (?).
The thread starts ok, but when trying to stop it gives "'Thread' object has no attribute 'stopit'" error.
Main test code is this:
#!/usr/bin/env python
import os
import sys
import threading
import time
import test_thread_external as TestThreadExternal
# ---
class test(object):
def __init__(self):
pass
def main(self):
print ("starting thread")
test_th = threading.Thread(target = TestThreadExternal.stoppable, args = ())
test_th.start()
time.sleep(3)
print ("stopping thread")
self.error_flag = False
try:
test_th.stopit()
except Exception as e:
self.error_flag = True
self.err = str(e)
if not self.error_flag:
time.sleep(3)
print ("exit supposedly with thread stopped") # can be checked with is_alive()
else:
print ("exit with error %s" % (self.err))
# ---
if __name__ == '__main__':
try:
app = test()
app.main()
except KeyboardInterrupt:
print ("exit via Ctrl+C")
finally:
try:
sys.exit(0)
except SystemExit:
os._exit(0)
and the thread itself is this one, stored in file 'test_thread_external.py':
#!/usr/bin/env python
import threading
import time
# ---
class stoppable(threading.Thread):
def __init__(self):
super(stoppable, self).__init__()
self._stopper = threading.Event()
while True:
print ("thread running")
if self.stopped():
break
time.sleep(1)
print ("thread on the way to its exit")
def stopit(self):
self._stopper.set()
def stopped(self):
return self._stopper.is_set()
What do I do wrong ?
Small later edit: changed _stop with _stopper in order to avoid "'Event' object is not callable" error in case using join()
(not shown here) -- as explained in this answer to a former question.