17
import concurrent.futures
import time 

def process_one(i):
    try:                                                                             
        print("dealing with {}".format(i))                                           
        time.sleep(50)
        print("{} Done.".format(i))                                                  
    except Exception as e:                                                           
        print(e)

def process_many():
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: 
        executor.map(process_one,
                range(100),                                                          
                timeout=3)                                                           


if __name__ == '__main__':                                                           
    MAX_WORKERS = 10
    try:
        process_many()
    except Exception as e:                                                           
        print(e)      

The docs say:

The returned iterator raises a concurrent.futures.TimeoutError if __next__() is called and the result isn’t available after timeout seconds from the original call to Executor.map()

But here the script didn't raise any exception and kept waiting. Any suggestions?

TrakJohnson
  • 1,755
  • 2
  • 18
  • 31
Hao Wang
  • 499
  • 5
  • 19

2 Answers2

7

As the docs specify, the timeout error will only be raised if you're calling the __next__() method on the map. To call this method, you could for example convert the output to a list:

from concurrent import futures
import threading
import time


def task(n):
    print("Launching task {}".format(n))
    time.sleep(n)
    print('{}: done with {}'.format(threading.current_thread().name, n))
    return n / 10


with futures.ThreadPoolExecutor(max_workers=5) as ex:
    results = ex.map(task, range(1, 6), timeout=3)
    print('main: starting')
    try:
        # without this conversion to a list, the timeout error is not raised
        real_results = list(results) 
    except futures._base.TimeoutError:
        print("TIMEOUT")

Output:

Launching task 1
Launching task 2
Launching task 3
Launching task 4
Launching task 5
ThreadPoolExecutor-9_0: done with 1
ThreadPoolExecutor-9_1: done with 2
TIMEOUT
ThreadPoolExecutor-9_2: done with 3
ThreadPoolExecutor-9_3: done with 4
ThreadPoolExecutor-9_4: done with 5

Here, the n-th task sleeps for n seconds, so the timeout is raised after task 2 is completed.


EDIT: If you want to terminate the tasks that didn't complete, you could try the answers in this question (they don't use ThreadPoolExecutor.map() though), or you could just ignore the returned values from the other tasks and let them finish:

from concurrent import futures
import threading
import time


def task(n):
    print("Launching task {}".format(n))
    time.sleep(n)
    print('{}: done with {}'.format(threading.current_thread().name, n))
    return n


with futures.ThreadPoolExecutor(max_workers=5) as ex:
    results = ex.map(task, range(1, 6), timeout=3)
    outputs = []
    try:
        for i in results:
            outputs.append(i)
    except futures._base.TimeoutError:
        print("TIMEOUT")
    print(outputs)

Output:

Launching task 1
Launching task 2
Launching task 3
Launching task 4
Launching task 5
ThreadPoolExecutor-5_0: done with 1
ThreadPoolExecutor-5_1: done with 2
TIMEOUT
[1, 2]
ThreadPoolExecutor-5_2: done with 3
ThreadPoolExecutor-5_3: done with 4
ThreadPoolExecutor-5_4: done with 5
TrakJohnson
  • 1,755
  • 2
  • 18
  • 31
  • **task(n)** will always be executed (printing "done with n"). Any way to interrupt it in case of TimeoutException? I also tried the generator way, explicitly calling __next__(); with the same result. – Hao Wang Jul 23 '18 at 02:11
  • @HaoWang I've edited my answer to adress this issue. However, I've just realised the second solution only works if your tasks are in time order, ie. if the task after has a higher delay - which makes it very impractical. I'll try and find something else. – TrakJohnson Jul 23 '18 at 09:26
  • PSA, the looping through results only triggers the timeout if within the `with` block. – dcompiled Mar 10 '23 at 23:52
0

As we see in the source (for python 3.7) map returns a function:

def map(self, fn, *iterables, timeout=None, chunksize=1):
    ...
    if timeout is not None:
        end_time = timeout + time.time()
    fs = [self.submit(fn, *args) for args in zip(*iterables)]
    # Yield must be hidden in closure so that the futures are submitted
    # before the first iterator value is required.
    def result_iterator():
        try:
            # reverse to keep finishing order
            fs.reverse()
            while fs:
                # Careful not to keep a reference to the popped future
                if timeout is None:
                    yield fs.pop().result()
                else:
                    yield fs.pop().result(end_time - time.time())
        finally:
            for future in fs:
                future.cancel()
    return result_iterator()

The TimeoutError is raised from yield fs.pop().result(end_time - time.time()) call but you have to request a result to reach that call.

The rationale is that you do not care about submitting the tasks. The tasks are submitted and start running in the background threads. What you care about is timing out when you request a result - it is a usual use case you submit tasks and you request a result from them in a limited time, not just submitting them and expect them to terminate in a limited time.

If the latter is what you were about you could use wait, as illustrated for instance in Individual timeouts for concurrent.futures

Mr_and_Mrs_D
  • 32,208
  • 39
  • 178
  • 361
  • Thanks for pointing out the direction. I found the following answer the best solution so far: https://stackoverflow.com/a/44719580/1405762 – Hao Wang Jul 30 '18 at 03:30
  • In my case, there is a huge pool of urls, I would like to sample them (get the page content of each) as much as possible, but would not mind to give up a slow connection and try the next. – Hao Wang Jul 30 '18 at 03:37