16

I'm doing an optimization of parameters of a complex simulation. I'm using the multiprocessing module for enhancing the performance of the optimization algorithm. The basics of multiprocessing I learned at http://pymotw.com/2/multiprocessing/basics.html. The complex simulation lasts different times depending on the given parameters from the optimization algorithm, around 1 to 5 minutes. If the parameters are chosen very badly, the simulation can last 30 minutes or more and the results are not useful. So I was thinking about build in a timeout to the multiprocessing, that terminates all simulations that last more than a defined time. Here is an abstracted version of the problem:

import numpy as np
import time
import multiprocessing

def worker(num):
    
    time.sleep(np.random.random()*20)

def main():
    
    pnum = 10    
    
    procs = []
    for i in range(pnum):
        p = multiprocessing.Process(target=worker, args=(i,), name = ('process_' + str(i+1)))
        procs.append(p)
        p.start()
        print('starting', p.name)
        
    for p in procs:
        p.join(5)
        print('stopping', p.name)
     
if __name__ == "__main__":
    main()

The line p.join(5) defines the timeout of 5 seconds. Because of the for-loop for p in procs: the program waits 5 seconds until the first process is finished and then again 5 seconds until the second process is finished and so on, but i want the program to terminate all processes that last more than 5 seconds. Additionally, if none of the processes last longer than 5 seconds the program must not wait this 5 seconds.

swiss_knight
  • 5,787
  • 8
  • 50
  • 92
brp
  • 193
  • 1
  • 1
  • 6
  • Take a look here: http://stackoverflow.com/q/1191374/2615940. It may be a duplicate, but I'm not sure enough to flag it for you. If the proposed solution to that answer doesn't work for you, please let us know why. – skrrgwasme Sep 26 '14 at 16:20
  • This is an interesting article, but as I see it, its the solution for consecutively and not simultaneously started processes. My program should start the processes at the same time and kill those which exceed a 'global' timeout. – brp Sep 27 '14 at 10:10

3 Answers3

17

You can do this by creating a loop that will wait for some timeout amount of seconds, frequently checking to see if all processes are finished. If they don't all finish in the allotted amount of time, then terminate all of the processes:

TIMEOUT = 5 
start = time.time()
while time.time() - start <= TIMEOUT:
    if not any(p.is_alive() for p in procs):
        # All the processes are done, break now.
        break

    time.sleep(.1)  # Just to avoid hogging the CPU
else:
    # We only enter this if we didn't 'break' above.
    print("timed out, killing all processes")
    for p in procs:
        p.terminate()
        p.join()
Allan Lewis
  • 308
  • 3
  • 13
dano
  • 91,354
  • 19
  • 222
  • 219
  • 1
    Thank you, this seems like a appropriate solution. Unfortunately this code does not break if the processes are finished before the timeout is reached. I tried it by setting the worker function to `time.sleep(1)`, and after 1 second all `p.is_alive()` return `False`. So the code should go to the `break` statement now, but its still waiting for the timeout... – brp Sep 27 '14 at 10:35
  • I found the problem: `print (p.is_alive() for p in procs)` returns ` at 0x05712B20>` but it should be a list with elements `True` or `False` to be comprehensible for `any()` – brp Sep 27 '14 at 11:10
  • @brp use `any([p.is_alive() for p in procs])`. That way it becomes a list comprehension instead of a generator expression. – dano Sep 27 '14 at 12:41
  • 1
    @brp oh, I just noticed you're using `np.any` instead of the built-in `any`. That's why the generator expression didn't work. `np.any` only works with array-like objects. – dano Sep 27 '14 at 12:48
  • built-in `any` with list comprehension is the key! Thank you! – brp Sep 27 '14 at 15:04
13

If you want to kill all the processes you could use the Pool from multiprocessing you'll need to define a general timeout for all the execution as opposed of individual timeouts.

import numpy as np
import time
from multiprocessing import Pool

def worker(num):
    xtime = np.random.random()*20
    time.sleep(xtime)
    return xtime

def main():

    pnum = 10
    pool = Pool()
    args = range(pnum)
    pool_result = pool.map_async(worker, args)

    # wait 5 minutes for every worker to finish
    pool_result.wait(timeout=300)

    # once the timeout has finished we can try to get the results
    if pool_result.ready():
        print(pool_result.get(timeout=1))

if __name__ == "__main__":
    main()

This will get you a list with the return values for all your workers in order.
More information here: https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.pool

swiss_knight
  • 5,787
  • 8
  • 50
  • 92
Raiden Drake
  • 159
  • 2
  • 6
  • I don't think this actually terminates the threads in the pool -- it just returns execution to the main thread even if they're not done – Ben Wheeler Jan 29 '21 at 18:19
  • 1
    I do not understand why we do `pool_result.get(timeout=1)` i.e.: if the pool_result is ready already, shouldn't the results also be ready, and no timeout is needed? – Kieleth Jun 16 '21 at 17:54
3

Thanks to the help of dano I found a solution:

import numpy as np
import time
import multiprocessing

def worker(num):

    time.sleep(np.random.random()*20)

def main():

    pnum = 10    
    TIMEOUT = 5 
    procs = []
    bool_list = [True]*pnum

    for i in range(pnum):
        p = multiprocessing.Process(target=worker, args=(i,), name = ('process_' + str(i+1)))
        procs.append(p)
        p.start()
        print('starting', p.name)

    start = time.time()
    while time.time() - start <= TIMEOUT:
        for i in range(pnum):
            bool_list[i] = procs[i].is_alive()
            
        print(bool_list)
            
        if np.any(bool_list):  
            time.sleep(.1)  
        else:
            break
    else:
        print("timed out, killing all processes")
        for p in procs:
            p.terminate()
            
    for p in procs:
        print('stopping', p.name,'=', p.is_alive())
        p.join()

if __name__ == "__main__":
    main()

Its not the most elegant way, I'm sure there is a better way than using bool_list. Processes that are still alive after the timeout of 5 seconds will be killed. If you are setting shorter times in the worker function than the timeout, you will see that the program stops before the timeout of 5 seconds is reached. I'm still open for more elegant solutions if there are :)

swiss_knight
  • 5,787
  • 8
  • 50
  • 92
brp
  • 193
  • 1
  • 1
  • 6