2

I want to return already completed task values in multiprocessing for a given timeout after killing all the ongoing and queued tasks. For example, the following function needed to be run parallelly using pool.starmap() For values, 1 to 100.

def func(x):
  time.sleep(1)
  return x * 2

Let's say, after 5 seconds (defined timeout), tasks related to values 1 to 10 are completed and 11-90 are still running or queued. Then at 5 seconds, I want to kill all processes and return the values of the completed tasks for the values 1 to 10. So then the return list for the 100 tasks should be [2,4,6,8,10, ....18, 20, None, None,....None]. The main function I tried is as follows,

def main():
  pool = multiprocessing.Pool(processes=3)
  val = [[i] for i in range(100)]
  result_obj = pool.starmap_async(func, val)
  result_obj.wait(5)
  if result_obj.ready():
     result = result_obj.get(1)

However, here the processes are still running in the background and couldn't find a way to kill those processes and capture the already completed tasks after 5 seconds. Is this possible?

Jaliya
  • 337
  • 5
  • 20

1 Answers1

1

This answer contains all the information you need: To retrieve results while they are being generated, imap_unordered is probably the best function to use as it returns results to the main thread once they are completed. You would just have to perform a bit of bookkeeping to ensure that the results end up in the right position in your result queue. A way to achieve that would be to pass an index to the parallelized function which that function then returns.

Some simplified pseudo-code below that you should be able to derive a solution with:

def worker(id, other_args):
    arg1, arg2, ..., argn = other_args # Assuming other_args is a tuple or something else unpackable
    do something
    return idx, result

obtained_results = [None] * nr_tasks
with multiprocessing.Pool() as pool:
    for idx, result in pool.imap_unordered(worker, enumerate(arg_list)):
        obtained_results[idx] = result
        if time > timeout:
            pool.terminate()
            break

Only small disadvantage is that the timeout is only triggered after the first result exceeding the timeout is returned. For short tasks, this shouldn't be an issue. If you have longer-running tasks, you might need something else, but pool.terminate() is probably still the way to go for you.

C Hecht
  • 932
  • 5
  • 14
  • Thanks for your answer. However, how can we enter more than one input for the worker function (ex:- def worker(input1, input2, input3, ...)., because the functions I am using in my real code have several inputs. – Jaliya Nov 10 '21 at 03:33
  • I found a workaround for that issue also using another wrapper function. Thanks. – Jaliya Nov 10 '21 at 06:13
  • Either a wrapper or a unpacking would work. I edited my answer for you – C Hecht Nov 10 '21 at 06:37
  • Thanks and that also works. Further, a break should be placed under if and pool.terminate(). Otherwise, the for loop will get an error in the final run. As you state this is perfect for short-running tasks. However, I need to run some neural networks optimisations which may sometimes run for hours. So I am still searching for a solution to track the ongoing tasks. – Jaliya Nov 10 '21 at 11:30