1

I am using ThreadPoolExecutor in order to download a huge (~400k) amount of keyframe images. Keyframes names are stored in text file (let's say keyframes_list.txt).

I have modified the example provided in the documentation and it seems to work flawlessly with one exception: as it is clear the example passes every link to a future object which are all passed to an iterable (dict() to be precise). This iterable is passed as argument to as_completed() function to check when a future is completed. This of course requires a huge amount of text loaded at once in memory. My python process for this task takes up 1GB of RAM.

The full code is shown below:

import concurrent.futures
import requests

def download_keyframe(keyframe_name):
    url = 'http://server/to//Keyframes/{}.jpg'.format(keyframe_name)
    r = requests.get(url, allow_redirects=True)
    open('path/to/be/saved/keyframes/{}.jpg'.format(keyframe_name), 'wb').write(r.content)
    return True

keyframes_list_path = '/path/to/keyframes_list.txt'
future_to_url = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
    with open(keyframes_list_path, 'r') as f:
        for i, line in enumerate(f):
            fields = line.split('\t')
            keyframe_name = fields[0]
            future_to_url[executor.submit(download_keyframe, keyframe_name)] = keyframe_name
    for future in concurrent.futures.as_completed(future_to_url):
        keyframe_name = future_to_url[future]
        try:
            future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (keyframe_name, exc))
        else:
            print('Keyframe: {} was downloaded.'.format(keyframe_name))

So, my question is how could I provide both an iterable and also keep memory footprint low. I have considered using queue but I am not sure it's cooperating with ThreadPoolExecutor smoothly. Is there an easy way to control the amount of futures submitted to ThreadPoolExecutor?

Eypros
  • 5,370
  • 6
  • 42
  • 75
  • A little confused, if you want to print out that `keyframe_name`, then you must store them somewhere? How do you image that you can save the memory on this part? BTW, you should better close your file after writing. And further, you can use asynchronous IO(for example `aiohttp`) to do network communication, that can be faster and more lightweight. – Sraw Nov 01 '18 at 15:26
  • I am just saying that if I am using a number of workers (8 in my case) why do I to load all set of urls all the way? As for the file you are right (I just copied it from somewhere). – Eypros Nov 01 '18 at 15:31

2 Answers2

4

The answer by AdamKG is a good start, but his code will wait until a chunk has been processed completely before starting to process the next chunk. Therefore, you lose some performance.

I suggest a slightly different approach that will feed a continuous stream of tasks to the executor while enforcing an upper bound on the maximum number of parallel tasks in order to keep the memory footprint low.

The trick is to use concurrent.futures.wait to keep track of the futures that have been completed and those that are still pending completion:

def download_keyframe(keyframe_name):
    try:
        url = 'http://server/to//Keyframes/{}.jpg'.format(keyframe_name)
        r = requests.get(url, allow_redirects=True)
        open('path/to/be/saved/keyframes/{}.jpg'.format(keyframe_name), 'wb').write(r.content)
    except Exception as e:
        return keyframe_name, e

    return keyframe_name, None

MAX_WORKERS = 8
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    with open(keyframes_list_path, 'r') as fh:
        futures_notdone = set()
        futures_done = set()
        for i, line in enumerate(fh):
            # Submit new task to executor.
            fields = line.split('\t')
            keyframe_name = fields[0]
            futures_notdone.add(executor.submit(download_keyframe, keyframe_name))

            # Enforce upper bound on number of parallel tasks.
            if len(futures_notdone) >= MAX_WORKERS:
                done, futures_notdone = concurrent.futures.wait(futures_notdone, return_when=concurrent.futures.FIRST_COMPLETED)
                futures_done.update(done)

# Process results.
for future in futures_done:
    keyframe_name, exc = future.result()
    if exc:
        print('%r generated an exception: %s' % (keyframe_name, exc))
    else:
        print('Keyframe: {} was downloaded.'.format(keyframe_name))

Of course, you could also process the results inside the loop regularly in order to empty the futures_done from time to time. For example, you could do that each time the number of items in futures_done exceeds 1000 (or any other amount that fits your needs). This might come in handy if your dataset is very large and the results alone would result in a lot of memory usage.

robert
  • 3,484
  • 3
  • 29
  • 38
2

If we look at the source for as_completed(), the first thing it does is evaluate any iterable you pass as the first argument, on line 221, with fs=set(fs). So as long as you're reading and queuing the entire file at once, as_completed() is going to load all those Future instances into memory when you call it.

To get around it, you need to chunk the input, and only call as_completed with a subset of the Futures, on each iteration. You can use the snippet from this answer; chunks of ~1k should keep your thread pool saturated while not consuming excessive memory. Your final code, starting with the with-block for the ThreadPoolExecutor, should look something like this:

with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
    for lines in grouper(open(keyframes_list_path, 'r'), 1000):
        # reset the dict that as_completed() will check on every iteration
        future_to_url = {}
        for i, line in enumerate(lines):
            fields = line.split('\t')
            keyframe_name = fields[0]
            future_to_url[executor.submit(download_keyframe, keyframe_name)] = keyframe_name
        for future in concurrent.futures.as_completed(future_to_url):
            keyframe_name = future_to_url[future]
            try:
                future.result()
            except Exception as exc:
                print('%r generated an exception: %s' % (keyframe_name, exc))
            else:
                print('Keyframe: {} was downloaded.'.format(keyframe_name))
AdamKG
  • 13,678
  • 3
  • 38
  • 46