I've been trying to write an interactive wrapper (for use in ipython) for a library that controls some hardware. Some calls are heavy on the IO so it makes sense to carry out the tasks in parallel. Using a ThreadPool (almost) works nicely:
from multiprocessing.pool import ThreadPool
class hardware():
def __init__(IPaddress):
connect_to_hardware(IPaddress)
def some_long_task_to_hardware(wtime):
wait(wtime)
result = 'blah'
return result
pool = ThreadPool(processes=4)
Threads=[]
h=[hardware(IP1),hardware(IP2),hardware(IP3),hardware(IP4)]
for tt in range(4):
task=pool.apply_async(h[tt].some_long_task_to_hardware,(1000))
threads.append(task)
alive = [True]*4
Try:
while any(alive) :
for tt in range(4): alive[tt] = not threads[tt].ready()
do_other_stuff_for_a_bit()
except:
#some command I cannot find that will stop the threads...
raise
for tt in range(4): print(threads[tt].get())
The problem comes if the user wants to stop the process or there is an IO error in do_other_stuff_for_a_bit()
. Pressing Ctrl+C stops the main process but the worker threads carry on running until their current task is complete.
Is there some way to stop these threads without having to rewrite the library or have the user exit python? pool.terminate()
and pool.join()
that I have seen used in other examples do not seem to do the job.
The actual routine (instead of the simplified version above) uses logging and although all the worker threads are shut down at some point, I can see the processes that they started running carry on until complete (and being hardware I can see their effect by looking across the room).
This is in python 2.7.
UPDATE:
The solution seems to be to switch to using multiprocessing.Process instead of a thread pool. The test code I tried is to run foo_pulse:
class foo(object):
def foo_pulse(self,nPulse,name): #just one method of *many*
print('starting pulse for '+name)
result=[]
for ii in range(nPulse):
print('on for '+name)
time.sleep(2)
print('off for '+name)
time.sleep(2)
result.append(ii)
return result,name
If you try running this using ThreadPool then ctrl-C does not stop foo_pulse from running (even though it does kill the threads right away, the print statements keep on coming:
from multiprocessing.pool import ThreadPool
import time
def test(nPulse):
a=foo()
pool=ThreadPool(processes=4)
threads=[]
for rn in range(4) :
r=pool.apply_async(a.foo_pulse,(nPulse,'loop '+str(rn)))
threads.append(r)
alive=[True]*4
try:
while any(alive) : #wait until all threads complete
for rn in range(4):
alive[rn] = not threads[rn].ready()
time.sleep(1)
except : #stop threads if user presses ctrl-c
print('trying to stop threads')
pool.terminate()
print('stopped threads') # this line prints but output from foo_pulse carried on.
raise
else :
for t in threads : print(t.get())
However a version using multiprocessing.Process works as expected:
import multiprocessing as mp
import time
def test_pro(nPulse):
pros=[]
ans=[]
a=foo()
for rn in range(4) :
q=mp.Queue()
ans.append(q)
r=mp.Process(target=wrapper,args=(a,"foo_pulse",q),kwargs={'args':(nPulse,'loop '+str(rn))})
r.start()
pros.append(r)
try:
for p in pros : p.join()
print('all done')
except : #stop threads if user stops findRes
print('trying to stop threads')
for p in pros : p.terminate()
print('stopped threads')
else :
print('output here')
for q in ans :
print(q.get())
print('exit time')
Where I have defined a wrapper for the library foo (so that it did not need to be re-written). If the return value is not needed the neither is this wrapper :
def wrapper(a,target,q,args=(),kwargs={}):
'''Used when return value is wanted'''
q.put(getattr(a,target)(*args,**kwargs))
From the documentation I see no reason why a pool would not work (other than a bug).