5

I'm basing this off of the sample from https://docs.python.org/3/library/concurrent.futures.html#id1.

I've update the following:
data = future.result()
to this:
data = future.result(timeout=0.1)

The doc for concurrent.futures.Future.result states:

If the call hasn’t completed in timeout seconds, then a TimeoutError will be raised. timeout can be an int or float

(I know there is a timeout on the request, for 60, but in my real code I'm performing a different action that doesn't use a urllib request)

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the url and contents
def load_url(url, timeout):
    conn = urllib.request.urlopen(url, timeout=timeout)
    return conn.readall()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            # The below timeout isn't raising the TimeoutError.
            data = future.result(timeout=0.01)
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

TimeoutError is raised if I set it on the call to as_completed, but I need to set the timeout on a per Future basis, not all of them as a whole.


Update

Thanks @jme, that works with a single Future, but not with multiples using the below. Do I need to yield at the beginning of the functions to allow the build-up of the futures dict? From the docs it sounds like the calls to submit shouldn't block.

import concurrent.futures
import time
import sys

def wait():
    time.sleep(5)
    return 42

with concurrent.futures.ThreadPoolExecutor(4) as executor:
    waits = [wait, wait]
    futures = {executor.submit(w): w for w in waits}
    for future in concurrent.futures.as_completed(futures):
        try:
            future.result(timeout=1)
        except concurrent.futures.TimeoutError:
            print("Too long!")
            sys.stdout.flush()

print(future.result())
Justin
  • 4,434
  • 4
  • 28
  • 37

2 Answers2

5

The issue seems to be with the call to concurrent.futures.as_completed().

If I replace that with just a for loop, everything seems to work:

for wait, future in [(w, executor.submit(w)) for w in waits]:
    ...

I misinterpreted the doc for as_completed which states:

...yields futures as they complete (finished or were cancelled)...

as_completed will handle timeouts but as a whole, not on a per future basis.

Justin
  • 4,434
  • 4
  • 28
  • 37
2

The exception is being raised in the main thread, you just aren't seeing it because stdout hasn't been flushed. Try for example:

import concurrent.futures
import time
import sys

def wait():
    time.sleep(5)
    return 42

with concurrent.futures.ThreadPoolExecutor(4) as executor:
    future = executor.submit(wait)
    try:
        future.result(timeout=1)
    except concurrent.futures.TimeoutError:
        print("Too long!")
        sys.stdout.flush()

print(future.result())

Run this and you'll see "Too long!" appear after one second, but the interpreter will wait an additional four seconds for the threads to finish executing. Then you'll see 42 -- the result of wait() -- appear.

What does this mean? Setting a timeout doesn't kill the thread, and that's actually a good thing. What if the thread is holding a lock? If we kill it abruptly, that lock is never freed. No, it's much better to let the thread handle its own demise. Likewise, the purpose of future.cancel is to prevent a thread from starting, not to kill it.

jme
  • 19,895
  • 6
  • 41
  • 39