11

I've been trying to write an interactive wrapper (for use in ipython) for a library that controls some hardware. Some calls are heavy on the IO so it makes sense to carry out the tasks in parallel. Using a ThreadPool (almost) works nicely:

from multiprocessing.pool import ThreadPool

class hardware():
    def __init__(IPaddress):
        connect_to_hardware(IPaddress)

    def some_long_task_to_hardware(wtime):
        wait(wtime)
        result = 'blah'
        return result

pool = ThreadPool(processes=4)
Threads=[]
h=[hardware(IP1),hardware(IP2),hardware(IP3),hardware(IP4)]
for tt in range(4):
    task=pool.apply_async(h[tt].some_long_task_to_hardware,(1000))
    threads.append(task)
alive = [True]*4
Try:
    while any(alive) :
        for tt in range(4): alive[tt] = not threads[tt].ready()
        do_other_stuff_for_a_bit()
except:
    #some command I cannot find that will stop the threads...
    raise
for tt in range(4): print(threads[tt].get())

The problem comes if the user wants to stop the process or there is an IO error in do_other_stuff_for_a_bit(). Pressing Ctrl+C stops the main process but the worker threads carry on running until their current task is complete.
Is there some way to stop these threads without having to rewrite the library or have the user exit python? pool.terminate() and pool.join() that I have seen used in other examples do not seem to do the job.

The actual routine (instead of the simplified version above) uses logging and although all the worker threads are shut down at some point, I can see the processes that they started running carry on until complete (and being hardware I can see their effect by looking across the room).

This is in python 2.7.

UPDATE:

The solution seems to be to switch to using multiprocessing.Process instead of a thread pool. The test code I tried is to run foo_pulse:

class foo(object):
    def foo_pulse(self,nPulse,name): #just one method of *many*
        print('starting pulse for '+name)
        result=[]
        for ii in range(nPulse):
            print('on for '+name)
            time.sleep(2)
            print('off for '+name)
            time.sleep(2)
            result.append(ii)
        return result,name

If you try running this using ThreadPool then ctrl-C does not stop foo_pulse from running (even though it does kill the threads right away, the print statements keep on coming:

from multiprocessing.pool import ThreadPool
import time
def test(nPulse):
    a=foo()
    pool=ThreadPool(processes=4)
    threads=[]
    for rn in range(4) :
        r=pool.apply_async(a.foo_pulse,(nPulse,'loop '+str(rn)))
        threads.append(r)
    alive=[True]*4
    try:
        while any(alive) : #wait until all threads complete
            for rn in range(4):
                alive[rn] = not threads[rn].ready() 
                time.sleep(1)
    except : #stop threads if user presses ctrl-c
        print('trying to stop threads')
        pool.terminate()
        print('stopped threads') # this line prints but output from foo_pulse carried on.
        raise
    else : 
        for t in threads : print(t.get())

However a version using multiprocessing.Process works as expected:

import multiprocessing as mp
import time
def test_pro(nPulse):
    pros=[]
    ans=[]
    a=foo()
    for rn in range(4) :
        q=mp.Queue()
        ans.append(q)
        r=mp.Process(target=wrapper,args=(a,"foo_pulse",q),kwargs={'args':(nPulse,'loop '+str(rn))})
        r.start()
        pros.append(r)
    try:
        for p in pros : p.join()
        print('all done')
    except : #stop threads if user stops findRes
        print('trying to stop threads')
        for p in pros : p.terminate()
        print('stopped threads')
    else : 
        print('output here')
        for q in ans :
            print(q.get())
    print('exit time')

Where I have defined a wrapper for the library foo (so that it did not need to be re-written). If the return value is not needed the neither is this wrapper :

def wrapper(a,target,q,args=(),kwargs={}):
    '''Used when return value is wanted'''
    q.put(getattr(a,target)(*args,**kwargs))

From the documentation I see no reason why a pool would not work (other than a bug).

SRD
  • 141
  • 1
  • 1
  • 9
  • 1
    Do you have any reason to use undocumented classes? You'd probably have better luck with `concurrent.futures` module. – UltraInstinct Aug 09 '16 at 18:13
  • No real reason to use the undocumented classes - other than this is what was used in the example code I came across when researching how to do it. – SRD Aug 10 '16 at 01:53
  • @SuperSaiyan: It's documented under a different name; `ThreadPool` there is exposed in a documented fashion under `multiprocessing.dummy.Pool`, where [`multiprocessing.dummy` is a close copy of the `multiprocessing` API backed by threads instead of processes](https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.dummy). – ShadowRanger Aug 10 '16 at 01:56

3 Answers3

3

This is a very interesting use of parallelism.

However, if you are using multiprocessing, the goal is to have many processes running in parallel, as opposed to one process running many threads.

Consider these few changes to implement it using multiprocessing:

You have these functions that will run in parallel:

import time
import multiprocessing as mp


def some_long_task_from_library(wtime):
    time.sleep(wtime)


class MyException(Exception): pass

def do_other_stuff_for_a_bit():
    time.sleep(5)
    raise MyException("Something Happened...")

Let's create and start the processes, say 4:

procs = []  # this is not a Pool, it is just a way to handle the
            # processes instead of calling them p1, p2, p3, p4...
for _ in range(4):
    p = mp.Process(target=some_long_task_from_library, args=(1000,))
    p.start()
    procs.append(p)
mp.active_children()   # this joins all the started processes, and runs them.

The processes are running in parallel, presumably in a separate cpu core, but that is to the OS to decide. You can check in your system monitor.

In the meantime you run a process that will break, and you want to stop the running processes, not leaving them orphan:

try:
    do_other_stuff_for_a_bit()
except MyException as exc:
    print(exc)
    print("Now stopping all processes...")
    for p in procs:
        p.terminate()
print("The rest of the process will continue")

If it doesn't make sense to continue with the main process when one or all of the subprocesses have terminated, you should handle the exit of the main program.

Hope it helps, and you can adapt bits of this for your library.

chapelo
  • 2,519
  • 13
  • 19
  • In my case it did not matter if everything runs on the same CPU, the reason for running in parallel is that there are massive waits on IO. However this method works with the one downside that it is hard to return values from the calls. For now I solved this with a wrapper function - see my updated post. – SRD Aug 10 '16 at 18:42
  • Depending of which kind of values you need to return from the calls, you can use `Queue`, `Pipe`, shared memory `Value` or `Array`, or even a disk file. In some of these cases you may need to use `Lock`s. – chapelo Aug 10 '16 at 18:55
0

In answer to the question of why pool did not work then this is due to (as quoted in the Documentation) then main needs to be importable by the child processes and due to the nature of this project interactive python is being used.

At the same time it was not clear why ThreadPool would - although the clue is right there in the name. ThreadPool creates its pool of worker processes using multiprocessing.dummy which as noted here is just a wrapper around the Threading module. Pool uses the multiprocessing.Process. This can be seen by this test:

p=ThreadPool(processes=3)
p._pool[0]
<DummyProcess(Thread23, started daemon 12345)> #no terminate() method

p=Pool(processes=3)
p._pool[0]
<Process(PoolWorker-1, started daemon)> #has handy terminate() method if needed

As threads do not have a terminate method the worker threads carry on running until they have completed their current task. Killing threads is messy (which is why I tried to use the multiprocessing module) but solutions are here.

The one warning about the solution using the above:

def wrapper(a,target,q,args=(),kwargs={}):
    '''Used when return value is wanted'''
    q.put(getattr(a,target)(*args,**kwargs))

is that changes to attributes inside the instance of the object are not passed back up to the main program. As an example the class foo above can also have methods such as: def addIP(newIP): self.hardwareIP=newIP A call to r=mp.Process(target=a.addIP,args=(127.0.0.1)) does not update a.

The only way round this for a complex object seems to be shared memory using a custom manager which can give access to both the methods and attributes of object a For a very large complex object based on a library this may be best done using dir(foo) to populate the manager. If I can figure out how I'll update this answer with an example (for my future self as much as others).

Community
  • 1
  • 1
SRD
  • 141
  • 1
  • 1
  • 9
0

If for some reasons using threads is preferable, we can use this.

We can send some siginal to the threads we want to terminate. The simplest siginal is global variable:

import time
from multiprocessing.pool import ThreadPool

_FINISH = False

def hang():
    while True:
        if _FINISH:
            break
        print 'hanging..'
        time.sleep(10)


def main():
    global _FINISH
    pool = ThreadPool(processes=1)
    pool.apply_async(hang)
    time.sleep(10)
    _FINISH = True
    pool.terminate()
    pool.join()
    print 'main process exiting..'


if __name__ == '__main__':
    main()
Tin Ng
  • 1,174
  • 1
  • 8
  • 14