6

I am using Python 2.7.

I am currently using ThreadPoolExecuter like this:

params = [1,2,3,4,5,6,7,8,9,10]
with concurrent.futures.ThreadPoolExecutor(5) as executor:
    result = list(executor.map(f, params))

The problem is that f sometimes runs for too long. Whenever I run f, I want to limit its run to 100 seconds, and then kill it.

Eventually, for each element x in param, I would like to have an indication of whether or not f had to be killed, and in case it wasn't - what was the return value. Even if f times out for one parameter, I still want to run it with the next parameters.

The executer.map method does have a timeout parameter, but it sets a timeout for the entire run, from the time of the call to executer.map, and not for each thread separately.

What is the easiest way to get my desired behavior?

dano
  • 91,354
  • 19
  • 222
  • 219
user302099
  • 635
  • 1
  • 5
  • 17
  • 1
    There is no direct way to kill a thread in Python. If the `timeout` you pass to `map` expires, it won't actually terminate the Executor threads, it will just make the `future.result(timeout)` call it's making internally raise a `TimeoutError` exception. The worker threads will continue running in the background, though. If you need the thread to actually be terminated, you need to have your worker function to check for some kind of flag that the parent can set after the timeout has expired. That may not be easy to implement, though, depending on what the worker function is doing. – dano Sep 22 '14 at 14:35
  • @dano: I see. The process still running in the background is something I may be able to live with. But let's say the thread handling params[4] got stuck, can I still get the result of the processes handling params[5] to params[9]? – user302099 Sep 22 '14 at 15:32
  • @user302099: you could use `as_completed()` instead of `map()` to get `params[5]` result if is ready before `params[4]`. If you use threads then the function should cooperate (respect exit condition). If you can't rely on the function to behave then use processes. – jfs Sep 22 '14 at 15:52

1 Answers1

9

This answer is in terms of python's multiprocessing library, which is usually preferable to the threading library, unless your functions are just waiting on network calls. Note that the multiprocessing and threading libraries have the same interface.

Given you're processes run for potentially 100 seconds each, the overhead of creating a process for each one is fairly small in comparison. You probably have to make your own processes to get the necessary control.

One option is to wrap f in another function that will exectue for at most 100 seconds:

from multiprocessing import Pool

def timeout_f(arg):
    pool = Pool(processes=1)
    return pool.apply_async(f, [arg]).get(timeout=100)

Then your code changes to:

    result = list(executor.map(timeout_f, params))

Alternatively, you could write your own thread/process control:

from multiprocessing import Process
from time import time

def chunks(l, n):
    """ Yield successive n-sized chunks from l. """
    for i in xrange(0, len(l), n):
        yield l[i:i+n]

processes = [Process(target=f, args=(i,)) for i in params]
exit_codes = []
for five_processes = chunks(processes, 5):
    for p in five_processes:
        p.start()
    time_waited = 0
    start = time()
    for p in five_processes:
        if time_waited >= 100:
            p.join(0)
            p.terminate()
        p.join(100 - time_waited)
        p.terminate()
        time_waited = time() - start
    for p in five_processes:
        exit_codes.append(p.exit_code)

You'd have to get the return values through something like Can I get a return value from multiprocessing.Process?

The exit codes of the processes are 0 if the processes completed and non-zero if they were terminated.

Techniques from: Join a group of python processes with a timeout, How do you split a list into evenly sized chunks?


As another option, you could just try to use apply_async on multiprocessing.Pool

from multiprocessing import Pool, TimeoutError
from time import sleep    

if __name__ == "__main__":
    pool = Pool(processes=5)
    processes = [pool.apply_async(f, [i]) for i in params]
    results = []
    for process in processes:
        try:
            result.append(process.get(timeout=100))
        except TimeoutError as e:
            results.append(e)

Note that the above possibly waits more than 100 seconds for each process, as if the first one takes 50 seconds to complete, the second process will have had 50 extra seconds in its run time. More complicated logic (such as the previous example) is needed to enforce stricter timeouts.

Community
  • 1
  • 1
Zags
  • 37,389
  • 14
  • 105
  • 140
  • The first solution forces you to wait 100 seconds, even if all the processes end up completing in 5 seconds. You'd probably want a loop that sleeps for a couple of seconds, then checks to see if any of the processes are still running, when go back to sleep if any are. – dano Sep 22 '14 at 15:59
  • @dano yes, was writing a fast answer. Updated to use better logic – Zags Sep 22 '14 at 16:06
  • It looks like you made some kind of copy/paste error with your edit. The indentation is off and you're calling `join` and `terminate` twice. – dano Sep 22 '14 at 16:21
  • On order to use the first solution, you can call map on a function containing a second pool, however the pool needs to be non daemonic: http://stackoverflow.com/questions/6974695/python-process-pool-non-daemonic – Chris Lucian Mar 15 '15 at 07:09
  • @Zags Option 1 is working perfectly. I have a doubt, if pool.apply_async(f, [arg]).get(timeout=100) works as expected to kill a process after timeout, then why would we need to use ThreadPoolExecutor to create a threads. Instead can use multiprocessing pool only. – Darknight Oct 11 '19 at 19:43