0

I am dealing with thousands of image urls and want to use concurrent.futures.ProcessPoolExecutor to speed up.

Since some of the urls are broken or images are large, the process function may hang or unexpectedly consume a lot of time during processing. I want to add a timeout on the process function like 10 seconds to get rid of these invalid images.

I tried to set the timeout param in futures .as_completed, the TimeoutException could be successfully raised. However, it seems that the main process will still wait until the timeout child process is completed. Is there any approach to immediately kill the timeout child process and put next url into the pool?

from concurrent import futures

def process(url):
    ### Some time consuming operation
    return result


def main():
    urls = ['url1','url2','url3',...,'url100']
    with futures.ProcessPoolExecutor(max_workers=10) as executor:
        future_list = {executor.submit(process, url):url for url in urls}
        results = []
        try:
            for future in futures.as_completed(future_list, timeout=10):
                results.append(future.result())
        except futures._base.TimeoutException:
            print("timeout")
    print(results)
if __name__ == '__main__':
    main()

In above example, suppose that I have 100 urls, 10 of them are invalid and may cost a lot of time ,how to get the rest 90 urls' processed result list?

ken wang
  • 165
  • 1
  • 12
  • There are plenty of similar questions: https://stackoverflow.com/questions/44402085/multiprocessing-map-over-list-killing-processes-that-stall-above-timeout-limi https://stackoverflow.com/questions/38711840/python-multiprocessing-pool-timeout https://stackoverflow.com/questions/42782953/python-concurrent-futures-how-to-make-it-cancelable – noxdafox Feb 03 '20 at 14:30

1 Answers1

0

Not with the concurrent.futures library.

The pebble module has been developed to overcome such limitation.

from pebble import ProcessPool
from concurrent.futures import TimeoutError

with process.ProcessPool() as pool:
    future = pool.schedule(function, args=(1,2), timeout=5)

    try:
        result = future.result()  # blocks until results are ready
    except TimeoutError as error:
        print("Function took longer than %d seconds" % error.args[1])
noxdafox
  • 14,439
  • 4
  • 33
  • 45