1

I have a large number of tasks (40,000 to be exact) that I am using a Pool to run in parallel. To maximize efficiency, I pass the list of all tasks at once to starmap and let them run.

I would like to have it so that if my program is broken using Ctrl+C then currently running tasks will be allowed to finish but new ones will not be started. I have figured out the signal handling part to handle the Ctrl+C breaking just fine using the recommended method and this works well (at least with Python 3.6.9 that I am using):

import os
import signal
import random as rand
import multiprocessing as mp

def init() :
    signal.signal(signal.SIGINT, signal.SIG_IGN)

def child(a, b, c) :
    st = rand.randrange(5, 20+1)
    print("Worker thread", a+1, "sleep for", st, "...")
    os.system("sleep " + str(st))

pool = mp.Pool(initializer=init)

try :
    pool.starmap(child, [(i, 2*i, 3*i) for i in range(10)])
    pool.close()
    pool.join()
    print("True exit!")
except KeyboardInterrupt :
    pool.terminate()
    pool.join()
    print("Interupted exit!")

The problem is that Pool seems to have no function to let the currently running tasks complete and then stop. It only has terminate and close. In the example above I use terminate but this is not what I want as this immediately terminates all running tasks (whereas I want to let the currently running tasks run to completion). On the other hand, close simply prevents adding more tasks, but calling close then join will wait for all pending tasks to complete (40,000 of them in my real case) (whereas I only want currently running tasks to finish not all of them).

I could somehow gradually add my tasks one by one or in chunks so I could use close and join when interrupted, but this seems less efficient unless there is a way to add a new task as soon as one finishes manually (which I'm not seeing how to do from the Pool documentation). It really seems like my use case would be common and that Pool should have a function for this, but I have not seen this question asked anywhere (or maybe I'm just not searching for the right thing).

Does anyone know how to accomplish this easily?

kyp4
  • 133
  • 5

1 Answers1

1

I tried to do something similar with concurrent.futures - see the last code block in this answer: it attempts to throttle adding tasks to the pool and only adds new tasks as tasks complete. You could change the logic to fit your needs. Maybe keep the pending work items slightly greater than the number of workers so you don't starve the executor. something like:

import concurrent.futures
import random as rand
import time

def child(*args, n=0):
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    a,b,c = args
    st = rand.randrange(1, 5)
    time.sleep(st)
    x = f"Worker {n} thread {a+1} slept for {st} - args:{args}"
    return (n,x)


if __name__ == '__main__':
    nworkers = 5    # ncpus?
    results = []
    fs = []
    with concurrent.futures.ProcessPoolExecutor(max_workers=nworkers) as executor:
        data = ((i, 2*i, 3*i) for i in range(100))
        for n,args in enumerate(data):
            try:
                # limit pending tasks
                while len(executor._pending_work_items) >= nworkers + 2:
                    # wait till one completes and get the result
                    futures = concurrent.futures.wait(fs, return_when=concurrent.futures.FIRST_COMPLETED)
                    #print(futures)
                    results.extend(future.result() for future in futures.done)
                    print(f'{len(results)} results so far')
                    fs = list(futures.not_done)
                print(f'add a new task {n}')

                fs.append(executor.submit(child, *args,**{'n':n}))
            except KeyboardInterrupt as e:
                print('ctrl-c!!}',file=sys.stderr)
                # don't add anymore tasks
                break
        # get leftover results as they finish
        for future in concurrent.futures.as_completed(fs):
            print(f'{len(executor._pending_work_items)} tasks pending:')
            result = future.result()
            results.append(result)
    results.sort()
    # separate the results from the value used to sort
    for n,result in results:
        print(result)

Here is a way to get the results sorted in submission order without modifying the task. It uses a dictionary to relate each future to its submission order and uses it for the sort key.

# same imports
def child(*args):
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    a,b,c = args
    st = random.randrange(1, 5)
    time.sleep(st)
    x = f"Worker thread {a+1} slept for {st} - args:{args}"
    return x


if __name__ == '__main__':
    nworkers = 5    # ncpus?
    sort_dict = {}
    results = []
    fs = []
    with concurrent.futures.ProcessPoolExecutor(max_workers=nworkers) as executor:
        data = ((i, 2*i, 3*i) for i in range(100))
        for n,args in enumerate(data):
            try:
                # limit pending tasks
                while len(executor._pending_work_items) >= nworkers + 2:
                    # wait till one completes and grab it
                    futures = concurrent.futures.wait(fs, return_when=concurrent.futures.FIRST_COMPLETED)
                    results.extend(future for future in futures.done)
                    print(f'{len(results)} futures completed so far')
                    fs = list(futures.not_done)
                future = executor.submit(child, *args)
                fs.append(future)
                print(f'task {n} added - future:{future}')
                sort_dict[future] = n
            except KeyboardInterrupt as e:
                print('ctrl-c!!',file=sys.stderr)
                # don't add anymore tasks
                break
        # get leftover futures as they finish
        for future in concurrent.futures.as_completed(fs):
            print(f'{len(executor._pending_work_items)} tasks pending:')
            results.append(future)
    #sort the futures
    results.sort(key=lambda f: sort_dict[f])
    # get the results
    for future in results:
        print(future.result())

You could also just add an attribute to each future and sort on that (no need for the dictionary)

...
                future = executor.submit(child, *args)
                # add an attribute to the future that can be sorted on
                future.submitted = n
                fs.append(future)

...
    results.sort(key=lambda f: f.submitted)
wwii
  • 23,232
  • 7
  • 37
  • 77
  • Unfortunately this solution requires Python 3.7. In particular the `initializer` keyword in the constructor of `ProcessPoolExecutor` is new in 3.7. I was unable too get 3.7 working in Ubuntu 18.04 (currently comes with 3.6.9 by default) that I am using, so I couldn't test this. – kyp4 Mar 23 '20 at 15:24
  • @kyp4 - I am not familiar with the `signal` module so I don't really know what it is supposed to be doing- I only put it in there because you were using it. I removed it and ran this on Python v3.6 and it ran with similar results. I also put the call `signal.signal(signal.SIGINT, signal.SIG_IGN)` in `child` as the first line and it still executed (even though I don't know what it is doing) - does a `ctrl-c` in the main process even propagate to *child* processes? – wwii Mar 23 '20 at 17:07
  • I just moved the `signal` call in to the first line in the `child` and it seems to work well! That signal call is necessary though, at least in Linux, as the children will throw a `KeyboardInterrupt` exception if not, so the SIGINT must propagate down. However, I know that SIGINT is handled differently in Windows so they may not be necessary there. Thanks for the solution, I am definitely going to use this! – kyp4 Mar 25 '20 at 13:40
  • I did just discover another issue with the above code in that results are reflect the order in which the tasks finish rather than the order of the data iterator as would be the case with `starmap` but this is not difficult to fix. – kyp4 Mar 25 '20 at 14:07
  • I wasn't aware of that behaviour of `starmap` but [this posr](https://stackoverflow.com/a/50374183/2823755) implies that *underneath* the results are asynchronous and `starmap` reorders them before serving them to you. Depending on how much leeway you have with the task, you could add another parameter and use it reorder on the backend - i'll try to gin up an example. – wwii Mar 25 '20 at 14:22
  • @kyp4 see the edits: one modifies the `task` the other doesn't - both will sort the *results*. – wwii Mar 25 '20 at 15:19
  • Haha yeah I already implemented the reordering, evidently just like `starmap` does. – kyp4 Mar 25 '20 at 16:31