5

Is it possible to set a time limit on the pool map() operation when using multiprocessing in Python. When the time limit is reached, all child processes stop and return the results they already have.

import multiprocessing as mp

def task(v):
    do something
    return result

if __name__ == '__main__':
    vs = [...]
    p= mp.Pool()
    results = p.map(task, vs)

In the example above, I have a very large list vs. Ideally, all elements in the list vs will be sent to function task(), and all results will be saved in results list.

However, as the list vs is very large, and I only have a limited time to conduct this process (say 5min). What I need is to stop the map process when 5 minutes is reached, and return the calculated results to the list results.

EDIT1:

I'm not going to kill a task that needs more than 5 min to finish. Assuming I have 1000 tasks in the list vs, and only 600 tasks are finished after 5 mins. What I need is to kill all child processes, and save the results of these 600 tasks to the results list.

Jiawei Lu
  • 509
  • 6
  • 16
  • This looks similar: https://stackoverflow.com/questions/38711840/python-multiprocessing-pool-timeout – ForceBru Feb 04 '21 at 18:29
  • What makes you think that re-posting the exact copy of the closed question would bring you a different result. [ For over 10K:https://stackoverflow.com/questions/66050896/how-to-set-a-time-limit-on-the-pool-map-operation-when-using-multiprocessing] – PM 77-1 Feb 04 '21 at 18:31
  • @PM77-1 I edited the closed post, but I noticed it is still in close status and may not be visible to others, so I repost here – Jiawei Lu Feb 04 '21 at 18:41
  • @ForceBru I will check it. thanks – Jiawei Lu Feb 04 '21 at 19:04

1 Answers1

3

I looked at the answer referred to by ForceBru, which uses something called Pebble. First, I don't understand the comment about "Python standard Pool does not support timeouts." It does in a fashion in that you can wait for a specified amount of time for a result to be returned and be notified via an exception whether it has or you can just issue a wait on on a result object specifying a timeout. No exception will be returned in that case but you can test whether the "job" processing the result has completed or not. It is, however, true that you cannot terminate individual timed-out jobs. But when when you have processed all the results that have not timed out, you can call terminate on the pool itself, which will terminate all the processes with the pool whether they are idle or not. This leads to the second comment made in that answer, "and terminating processes abruptly might lead to weird behaviour within your applications." This is true depending on what the job that timed out was doing. So, we agree that we should not be timing out jobs and terminating them prematurely if doing so could lead to weird behavior. But I don't see how Pebble can deal with that issue any better.

The question to which that answer was the response actually has one technique for doing what you wish buried within it. You need to give up on using the map function and switch to using apply_async specifying a callback function so that results can be saved as they become available. In the example below, I am using a TIMEOUT value of 5 seconds just for demo purposes and have arranged for approximately half of the 10 jobs I am submitting to timeout. I have pre-allocated a results list named squares that will hold the 10 results and this has been initialized wit 10 None values. If the ith value is None when we are all done, it is because the job that was processing value i timed out. My workder function also return its argument, v, as well as its computed value, v ** 2, so that the callback function knows what slot in the squares list the computed result should go:

import multiprocessing as mp
import time

def my_task(v):
    time.sleep(v)
    return v, v ** 2

squares = [None] * 10

def my_callback(t):
    i, s = t
    squares[i] = s


TIMEOUT = 5

if __name__ == '__main__':
    vs = range(10)
    pool = mp.Pool()
    results = [pool.apply_async(my_task, args=(v,), callback=my_callback) for v in vs]
    time.sleep(TIMEOUT)
    pool.terminate() # all processes, busy or idle, will be terminated
    print(squares)

Prints:

[0, 1, 4, 9, 16, None, None, None, None, None]

A second, more complicated method does not use a callback function. Rather it does a get call on each AsynchResult instance returned by the calls to pool.apply_async specifying a timeout value. The tricky bit here is that for the initial call you have to use the full timeout value. But by time the result has been returned or you have gotten a timeout exception, you have already waited some amount of time, t. That means the next time you are getting the result with a timeout, the timeout value you specify should be reduced by t:

import multiprocessing as mp
import time

def my_task(v):
    time.sleep(6 if v == 0 else v)
    return v ** 2


TIMEOUT = 5

if __name__ == '__main__':
    vs = range(mp.cpu_count() - 1) # 7 on my desktop
    pool = mp.Pool() # poolsize is 8
    results = [pool.apply_async(my_task, args=(v,)) for v in vs]
    time_to_wait = TIMEOUT # initial time to wait
    start_time = time.time()
    for i, result in enumerate(results):
        try:
            return_value = result.get(time_to_wait) # wait for up to time_to_wait seconds
        except mp.TimeoutError:
            print('Timeout for v = ', i)
        else:
            print(f'Return value for v = {i} is {return_value}')
        # how much time has exprired since we began waiting?
        t = time.time() - start_time
        time_to_wait = TIMEOUT - t
        if time_to_wait < 0:
            time_to_wait = 0
    pool.terminate() # all processes, busy or idle, will be terminated

Prints:

Timeout for v =  0
Return value for v = 1 is 1
Return value for v = 2 is 4
Return value for v = 3 is 9
Return value for v = 4 is 16
Timeout for v =  5
Timeout for v =  6

Note

By using apply_async rather than map, jobs are being submitted effectively with a chunksize of 1 (see the chunksize parameter for map, which determines how the iterable argument is broken up into "chunks" to be put on each process's input queue to minimize the number of shared memory transfers. For large iterables, apply_async can be inefficient compared to map, which uses a "reasonable" default chunksize based on the size of your pool and the number of jobs to be processed.

Booboo
  • 38,656
  • 3
  • 37
  • 60
  • Both two approaches work for me. I have a concern about the second one. In the function my_task(v), I use time.sleep(6) if v == 0 and keep others unchanged. In that case, tasks with v from 1-4 can be finished. In the results retrieve step, it is designed to retrieve results orderly. So, v==0 will consume all time_to_wait, letting time_to_wait = 0 for others. Although the result of v = 1 to 4 can be successfully retrieved with time_to_wait = 0 on my laptop, I'm concerning does it always work when we get results from finished tasks with 0 waiting time, especially when hardware is old and slow. – Jiawei Lu Feb 04 '21 at 23:11
  • if you have `if v == 0: time.sleep(6)` but no other job sleeps, then if the number of jobs submitted is <= the number of processes in the pool (the pool size), then you will timeout for the first job. But you will be calling `wait(0)` for all the other jobs, *which will have already completed* and so they will not throw timeout exceptions and everything will work as expected. Try it. The default pool size will be `mp.cpu_count()`. So set `vs = range(mp.cpu_count() - 2)`. (more) – Booboo Feb 05 '21 at 00:45
  • But if the number of jobs is greater than the pool size, there will be at least one job in the first process's queue that will not be scheduled to run until the first job finishes. So those jobs will also timeout. **This will be true using either method.** – Booboo Feb 05 '21 at 00:46
  • I've updated the second coding example to have `my_task` sleep for 6 seconds when `v` is 0 else it will sleep for `v` seconds as before. And I submit 7 jobs with a pool size of 8 to make sure there are no jobs waiting for a process to run. – Booboo Feb 05 '21 at 00:54
  • Here is an approach that works even when number of jobs is greater than pool size: https://stackoverflow.com/questions/29494001/how-can-i-abort-a-task-in-a-multiprocessing-pool-after-a-timeout – darkgbm Mar 18 '23 at 14:08
  • @Student Specifying *maxtasksperchild=1* is inefficient (a new process is started for every submitted job even if the job completes successfully) and also unnecessary. See [demo](https://ideone.com/PVP80o). The pool size is only 1 and the number of jobs is greater (2). The first submitted job raises an exception killing the pool process. The process is being restarted even without the *maxtasksperchild=1* argument. Otherwise the second submitted job, which was sitting in the task queue waiting for the first submitted job to complete, would never have started and completed successfully. – Booboo Mar 18 '23 at 14:39
  • @Booboo if a new process always starts for every submitted job, what is the benefit of having the maxtasksperchild parameter? My understanding is that if maxtasksperchild>1, processes don't have to be re-started and there will be less overhead cost. – darkgbm Mar 18 '23 at 18:12
  • @Booboo I also don't think it is true that new process gets started for each submitted job. See https://ideone.com/l7aQNB. The process id is the same for all jobs, even though 2 failed. This means that the other jobs did not get re-started. – darkgbm Mar 18 '23 at 19:28
  • @Student From the Python 3 Manual, which you should read: ***maxtasksperchild* is the number of tasks a worker process can complete before it will exit and be replaced with a fresh worker process, to enable unused resources to be freed. The default *maxtasksperchild* is `None`, which means worker processes will live as long as the pool.** Specifying a value of 1 could be useful if your processes are, for example, leaking memory. – Booboo Mar 18 '23 at 19:53
  • And new processes will *not* be started for every submitted job if you take the default value of `None` for *maxtasksperchild*. **This was my whole point.** – Booboo Mar 18 '23 at 19:56
  • @Booboo Let's take a step back, in your example (https://ideone.com/PVP80o), is the statement "The first submitted job raises an exception killing the pool process." true? – darkgbm Mar 18 '23 at 20:35
  • 1
    @Student Right! Raising an exception is not sufficient to kill the process. See [this demo](https://ideone.com/6pp85t). – Booboo Mar 18 '23 at 22:29