3

I'm using Python's concurrent.futures framework. I have used the map() function to launch concurrent tasks as such:

def func(i):
    return i*i

list = [1,2,3,4,5]
async_executor = concurrent.futures.ThreadPoolExecutor(5)
results = async_executor.map(func,list)

I am interested only in the first n results and want to stop the executor after the first n threads are finished where n is a number less than the size of the input list. Is there any way to do this in Python? Is there another framework I should look into?

martineau
  • 119,623
  • 25
  • 170
  • 301
user2654096
  • 71
  • 2
  • 8
  • Is it mandatory that the solution is able to terminate even those functions which are already started when the first N finish? Or is it enough to say "No new jobs will start after the first N finished, but already-started jobs will be allowed to complete even if we ignore their results"? If you need the former, see: https://stackoverflow.com/questions/42782953/python-concurrent-futures-how-to-make-it-cancelable - it is not directly possible using the library you are using. – John Zwinck Jun 02 '18 at 01:39
  • Your second statement seems suitable for my application. I'm thinking I can use a counting semaphore and then cancel the executor once the semaphore reaches the value 'n'. My question is if I do this, what exactly will the result of 'map' be? Will it contain only the results of the first 'n' tasks? Will it block until the results for the already started but not yet finished tasks are computed? – user2654096 Jun 02 '18 at 01:53

1 Answers1

1

You can't use map() for this because it provides no way to stop waiting for the results, nor any way to get the submitted futures and cancel them. However, you can do it using submit():

import concurrent.futures
import time

def func(i):
    time.sleep(i)
    return i*i


list = [1,2,3,6,6,6,90,100]
async_executor = concurrent.futures.ThreadPoolExecutor(2)
futures = {async_executor.submit(func, i): i for i in list}
for ii, future in enumerate(concurrent.futures.as_completed(futures)):
    print(ii, "result is", future.result())
    if ii == 2:
        async_executor.shutdown(wait=False)
        for victim in futures:
            victim.cancel()
        break

The above code takes about 11 seconds to run--it executes jobs [1,2,3,6,7] but not the rest.

John Zwinck
  • 239,568
  • 38
  • 324
  • 436