4

I want to use multiprocessing.Pool, but multiprocessing.Pool can't abort a task after a timeout. I found solution and some modify it.

from multiprocessing import util, Pool, TimeoutError
from multiprocessing.dummy import Pool as ThreadPool
import threading
import sys
from functools import partial
import time


def worker(y):
    print("worker sleep {} sec, thread: {}".format(y, threading.current_thread()))
    start = time.time()
    while True:
       if time.time() - start >= y:
           break
       time.sleep(0.5)
       # show work progress
       print(y)
    return y


def collect_my_result(result):
    print("Got result {}".format(result))


def abortable_worker(func, *args, **kwargs):
    timeout = kwargs.get('timeout', None)
    p = ThreadPool(1)
    res = p.apply_async(func, args=args)
    try:
        # Wait timeout seconds for func to complete.
        out = res.get(timeout)
    except TimeoutError:
        print("Aborting due to timeout {}".format(args[1]))
        # kill worker itself when get TimeoutError
        sys.exit(1)
    else:
        return out


def empty_func():
    pass


if __name__ == "__main__":
    TIMEOUT = 4
    util.log_to_stderr(util.DEBUG)
    pool = Pool(processes=4)

    # k - time to job sleep
    featureClass = [(k,) for k in range(20, 0, -1)]  # list of arguments
    for f in featureClass:
        # check available worker
        pool.apply(empty_func)

        # run job with timeout
        abortable_func = partial(abortable_worker, worker, timeout=TIMEOUT)
        pool.apply_async(abortable_func, args=f, callback=collect_my_result)

    time.sleep(TIMEOUT)
    pool.terminate()
    print("exit")

main modification - worker process exit with sys.exit(1). It's kill worker process and kill job thread, but i'm not sure that this solution is good. What potential problems can i get, when process terminate itself with running job?

Community
  • 1
  • 1
rusnasonov
  • 752
  • 2
  • 12
  • 23
  • Ok. I guess you'd better handle timeout in your worker() and write the results to a common collection. In this way, you just need to call join() on all threads and then process the results. If your system is not heavily loaded, things should just work. – mljli Aug 03 '16 at 06:33

1 Answers1

13

There is no implicit risk in stopping a running job, the OS will take care of correctly terminating the process.

If your job is writing on files, you might end up with lots of truncated files on your disk.

Some small issue might also occur if you write on DBs or if you are connected with some remote process.

Nevertheless, Python standard Pool does not support worker termination on task timeout. Terminating processes abruptly might lead to weird behaviour within your application.

Pebble processing Pool does support timing-out tasks.

from pebble import ProcessPool
from concurrent.futures import TimeoutError

TIMEOUT_SECONDS = 5

def function(one, two):
    return one + two

with ProcessPool() as pool:
    future = pool.schedule(function, args=(1, 2), timeout=TIMEOUT_SECONDS)

    try:
        result = future.result()
    except TimeoutError:
        print("Future: %s took more than 5 seconds to complete" % future)
noxdafox
  • 14,439
  • 4
  • 33
  • 45
  • it's looks good. Do you now success stories using it in production? – rusnasonov Aug 08 '16 at 04:18
  • 1
    Not sure I understand correctly. You want success stories of Pebble in production or of systems killing processes? Pebble is a quite stable library with a fair [amount of downloads](http://pypi-ranking.info/module/Pebble). – noxdafox Aug 09 '16 at 19:14
  • Yes, you understand correctly. Do you know projects which using peeble? – rusnasonov Aug 10 '16 at 01:14
  • 1
    We use Pebble in production on few systems and it works nicely. I don't know any public project using it. – noxdafox Aug 10 '16 at 10:42
  • 1
    Thanks @noxdafox , Pebble saved me after 2 days on constant trial and errors of timeout locks. – ASHu2 Jan 10 '22 at 13:47