5

Up to now I do this:

rets=set(pool.map_async(my_callback, args.hosts).get(60*4))

If the timeout is hit, I get an exception:

 File "/usr/lib/python2.7/multiprocessing/pool.py", line 524, in get
    raise TimeoutError
multiprocessing.TimeoutError

I would like to handle this gracefully:

The output for all hosts I could reach should go into rets and all hosts which timed out should go into a separate list.

How could this be done?

Update

Six years later I think it makes more sense to use go instead of Python for concurrent applications.

guettli
  • 25,042
  • 81
  • 346
  • 663
  • try-except is obviously the solution, so I'm wondering what is the catch? – John Mee Jun 07 '16 at 05:39
  • `hosts I could reach` goes in `try`, `all hosts which timed out` go in `TimeoutError exception`.... where is the rocket science here? – NoobEditor Jun 07 '16 at 05:44
  • @JohnMee you say try-except is the solutions. But how? I my_callback() gets called 100 times in parallel. There is one line in the code: calling map_async(). AFAIK `rets` is empty if an exception gets raised. How to get two lists: result of not timed out calls and list of timed out hosts? – guettli Jun 07 '16 at 06:01
  • Ah, i see, the "catch" is that everything is happening in a one-liner. You'll need to [trawl through the docs](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.AsyncResult) and work out how to break it up into a multiline problem so you have one statement invoking one process and thus can catch the timeout on that one, and no other. – John Mee Jun 08 '16 at 00:49

1 Answers1

9

As far as I know, you can't, or at least not with map_async. map_async is a convenience method there to solve a particular problem for a particular use case, and that doesn't match up with what you've got because you want more fine control.

However, you can still do it, you just need to use the more fine-grained methods in the multiprocessing module. In particular, you can add jobs to your pool on the fly by using apply_async, which gives you much more control over how to handle success and failure of individual tasks.

Below is a pretty minimal example that I think does what you want:

from multiprocessing.pool import Pool, TimeoutError
from time import sleep, time


def task_function(xx):
    print('Task %d running' % xx)
    sleep(xx)
    print('Task %d ended' % xx)
    return 'Result of task %d' % xx

pl = Pool()
results = [
    pl.apply_async(task_function, (_xx,)) 
    for _xx in range(10)]

start = time()
wait_until = start + 5

rets = []
timed_out_results = []

for res in results:
    timeout = wait_until - time()
    if timeout < 0:
        timeout = 0

    try:
        rets.append(res.get(timeout))
    except TimeoutError:
        timed_out_results.append(res)

print('%s ended' % (rets,))
print('%s timedout' % (timed_out_results,))

This runs 10 jobs that print a line, sleep, then print another line. The first sleeps for 0 seconds, the next for 1, the next 2 etc. We timeout the pool after 5 seconds, and so we expect 5 tasks to have finished and 5 tasks to have timed out.

Be aware that I have not stopped the tasks that are still running, and so in the real world they may continue and finish in the time it takes to print the results. You'll have to work out how much you care about that / what to do about it.

daphtdazz
  • 7,754
  • 34
  • 54