24

I am trying to use the multiprocessing package of python in this way:

featureClass = [[1000, k, 1] for k in drange(start, end, step)] #list of arguments
for f in featureClass:
  pool.apply_async(worker, args=f, callback=collectMyResult)
pool.close()
pool.join

From processes of the pool I want to avoid waiting those which take more than 60s to return its result. Is that possible?

Yuri Astrakhan
  • 8,808
  • 6
  • 63
  • 97
farhawa
  • 10,120
  • 16
  • 49
  • 91
  • What does `worker` look like? The easiest way to do this with a `multiprocessing.Pool` is to make `worker` interruptible, but that may not be possible, depending on what it's doing. – dano Apr 07 '15 at 14:29
  • worker is a simple function with a list input and a list output – farhawa Apr 07 '15 at 14:30
  • What is it actually doing, though? I assume it's iterating over the list, but what kind of operations is it doing on each item? How long does each operation take? – dano Apr 07 '15 at 14:33
  • ok! the worker's mission is to train an svm model (#machine_learning, #classification) and return the elapsed time – farhawa Apr 07 '15 at 14:47
  • I have a working solution to the same problem, posted here: http://stackoverflow.com/a/40724036/2512195 – andreypz Dec 01 '16 at 12:36

2 Answers2

37

Here's a way you can do this without needing to change your worker function. There are two steps required:

  1. Use the maxtasksperchild option you can pass to multiprocessing.Pool to ensure the worker processes in the pool are restarted after every task execution.
  2. Wrap your existing worker function in another function, which will call worker in a daemon thread, and then wait for a result from that thread for timeout seconds. Using a daemon thread is important because processes won't wait for daemon threads to finish before exiting.

If the timeout expires, you exit (or abort - it's up to you) the wrapper function, which will end the task, and because you've set maxtasksperchild=1, cause the Pool to terminate the worker process and start a new one. This will mean that the background thread doing your real work also gets aborted, because it's a daemon thread, and the process it's living got shut down.

import multiprocessing
from multiprocessing.dummy import Pool as ThreadPool
from functools import partial

def worker(x, y, z):
    pass # Do whatever here

def collectMyResult(result):
    print("Got result {}".format(result))

def abortable_worker(func, *args, **kwargs):
    timeout = kwargs.get('timeout', None)
    p = ThreadPool(1)
    res = p.apply_async(func, args=args)
    try:
        out = res.get(timeout)  # Wait timeout seconds for func to complete.
        return out
    except multiprocessing.TimeoutError:
        print("Aborting due to timeout")
        raise
    finally:
        p.close()
        p.join()

if __name__ == "__main__":
    pool = multiprocessing.Pool(maxtasksperchild=1)
    featureClass = [[1000,k,1] for k in range(start,end,step)] #list of arguments
    for f in featureClass:
      abortable_func = partial(abortable_worker, worker, timeout=3)
      pool.apply_async(abortable_func, args=f,callback=collectMyResult)
    pool.close()
    pool.join()

Any function that timeouts will raise multiprocessing.TimeoutError. Note that this means your callback won't execute when a timeout occurs. If this isn't acceptable, just change the except block of abortable_worker to return something instead of calling raise.

Also keep in mind that restarting worker processes after every task execution will have a negative impact on the performance of the Pool, due to the increased overhead. You should measure that for your use-case and see if the trade-off is worth it to have the ability to abort the work. If it's a problem, you may need to try another approach, like co-operatively interrupting worker if it has run too long, rather than trying to kill it from the outside. There are many questions on SO that cover this topic.

dano
  • 91,354
  • 19
  • 222
  • 219
  • Just one more question, what is the number of parallel processes in your code? – farhawa Apr 08 '15 at 07:43
  • @wajdi It uses the default number of processes, which is always equal to the number of CPUs on the machine executing the script. If you want to specify the number, pass it to the `multiprocessing.Pool` constructor: `pool = multiprocessing.Pool(4)`. – dano Apr 08 '15 at 12:38
  • Perfect, so `pool = multiprocessing.Pool(N)` will launch N worker in parallel but what if I have more than N worker, Nx2 for example? in which order workers will be launched.. And thank you for your time @dano – farhawa Apr 08 '15 at 13:00
  • @wajdiF I'm not completely sure I understand the question. When you pass `N` to the `Pool` constructor, `N` processes are immediately launched, and those exact `N` processes continue to run for the entire lifetime of the pool. No additional processes get added, and no processes get removed. Any work items you pass to the `Pool` via `apply`/`map` get distributed to those `N` workers. If all the workers are busy, the work item get queued until a worker frees up and is able to execute it. Does that answer your question? – dano Apr 08 '15 at 16:15
  • Yes thank you for details, I was asking about the way in which processes will be launched. In this case for example: `pool = multiprocessing.Pool(4)`\n `list=[x for x in range(10)]`\n `for var in list:` `pool.apply_async(f,var)` – farhawa Apr 08 '15 at 21:06
  • @wajdiF All the processes are launched the instant you create the `Pool`. The same `N` processes handle every single task you pass to `apply_async`, by pulling them off of a queue shared between all the processes. – dano Apr 08 '15 at 21:23
  • 2
    Is it working? I tried to run the sample code above, change the worker func to `def worker(x, y, z): \n while 1:\n pass`, the process won't end. http://stackoverflow.com/a/24634225/3291799 did work though, but in a dangerous way. – Tilney Jul 06 '16 at 04:20
  • you can change line p.terminate() to sys.exit(1). It's kill worker process and kill job thread, but but I'm not sure that this solution is good – rusnasonov Aug 02 '16 at 04:10
  • @dano what would be the license for this code beside the standard CC-BY-SA? – Philippe Ombredanne Nov 11 '16 at 16:24
  • @Tilney Hmm, it actually still works for me on Python 2.7 with that `worker` implementation. Strange... – dano Nov 11 '16 at 16:44
  • @PhilippeOmbredanne Public Domain, as far as I'm concerned. – dano Nov 11 '16 at 16:45
  • @dano Thanks for the quick reply! – Philippe Ombredanne Nov 11 '16 at 16:48
  • 1
    @jjjjjj `worker` is the function containing whatever code you want to execute in sub-processes. – dano Aug 20 '17 at 17:32
  • On Python 3.7 I get this error: `AssertionError: daemonic processes are not allowed to have children` – phaebz Feb 07 '20 at 09:46
  • Did you make sure you used a Thread pool in the worker process? You'll get that error if you try to spawn an entire child process from the worker – dano Feb 08 '20 at 17:07
  • "which will abruptly terminate the thread worker is executing in" - Unfortunately there's no mechanism for this. As you certainly know, a thread can't be "killed" and `Pool._terminate_pool()` internally checks `hasattr(pool[0], 'terminate')` before attempting to terminate its workers. Since a `Dummy.Process` (thread) has no `.terminate()` method, nothing happens to actually stop execution of your `worker()`. Unhandeled `Exception` instances in user-code generally don't terminate the whole worker-_process_ either since they get caught. – Darkonaut Feb 08 '20 at 18:45
  • What happens is that the worker-thread remains and finishes its task anyway. You can watch the growing number of remaining threads when you insert a `time.sleep(20)` into `worker()` and `print([t.name for t in threading.enumerate()])` in the first line of `abortable_worker()`. Limiting the number of processes to `pool = multiprocessing.Pool(2)` will also help for demonstration. – Darkonaut Feb 08 '20 at 18:46
  • @Darkonaut Yes, you're right, the answer I had here made no sense! I'm not sure what all these people up-voted it for. I can't remember writing this answer anymore (five years is a long time), but I *think* in my head it was supposed to be killing the `ProcessPool` processes, rather than the threads running inside of them. That's not at all what my answer says, though. Anyway, there actually is a way to make it work as intended, and I've updated my answer to reflect that. – dano Feb 09 '20 at 17:14
  • Inside the abortable_worker when handling Timeout Exception, wouldn't it be better to send a SIGTERM? That way, we won't have to limit maxtasksperchild to 1, and will still be able to kill the process? – darkgbm Mar 19 '23 at 02:14
  • 1
    @Student last I checked, exiting worker processes in a `multiprocessing.Pool` will actually break the pool. It doesn't replace the killed process or recognize the work it was doing when you killed it as completed. – dano Mar 20 '23 at 02:39
  • In ThreadPool(1), is it necessary for processes to be 1? – darkgbm Mar 21 '23 at 02:25
  • @Student `ThreadPool(1)` is creating a thread pool with one thread in it, which just waits for a single request to finish (or timeout) and then isn't used again. So you could give it more threads, but they wouldn't ever get used. – dano Mar 21 '23 at 14:15
  • @dano oh ok, so is it correct to say that func is not limited by TheadPool(1)'s one thread? – darkgbm Mar 22 '23 at 00:32
  • 1
    @Student correct. – dano Mar 22 '23 at 03:47
7

we can use gevent.Timeout to set time of worker running . gevent tutorial

from multiprocessing.dummy import Pool 
#you should install gevent.
from gevent import Timeout
from gevent import monkey
monkey.patch_all()
import time

def worker(sleep_time):
    try:

        seconds = 5  # max time the worker may run
        timeout = Timeout(seconds) 
        timeout.start()
        time.sleep(sleep_time)
        print "%s is a early bird"%sleep_time
    except:
        print "%s is late(time out)"%sleep_time

pool = Pool(4)

pool.map(worker, range(10))


output:
0 is a early bird
1 is a early bird
2 is a early bird
3 is a early bird
4 is a early bird
8 is late(time out)
5 is late(time out)
6 is late(time out)
7 is late(time out)
9 is late(time out)
thinkdeeper
  • 351
  • 1
  • 3
  • 9