0

I have a class called Downloader which I would like to use to handle transactions from my S3. I want to use concurrency to download things from S3 as fast as possible, as per this SO post, but when I put all the concurrency code in my class, I get some strange behaviour.

Running the code below

import boto3
import botocore
from concurrent import futures

class Downloader:
    def __init__(self, aws_key, aws_id):
        self.s3_client = boto3.resource('s3', aws_access_key_id=aws_id, aws_secret_access_key=aws_key)

    def download_s3(self, key, bucket='bucket'):
        try:
            f = self.s3_client.meta.client.get_object(Bucket=bucket, Key=key)['Body'].read()
        except ClientError as e:
            print(f'Key {key} not found')
            return None
        return np.frombuffer(f)

    def download_s3_parallel(self, keys):

        print(f'executor called...')
        with futures.ThreadPoolExecutor(max_workers=1) as executor:
            print(f'launching threads')
            future_to_key = {executor.submit(self.download_s3, key): key for key in keys}
            print(future_to_key)
        
            print(f'looping through completed threads')
            for future in futures.as_completed(future_to_key):
                key = future_to_key[future]
                exception = future.exception()

                if not exception:
                    yield key, future.result()
                else:
                    yield key, exception


keys = ['key1', 'key2', 'key3']
dl = Downloader(<my_secret_key>, <my_aws_id>)
all_objects = dl.download_s3_parallel(keys)

Gives no output, but when I comment out the for loop like so:

class Downloader:
    def __init__(self, aws_key, aws_id):
        self.s3_client = boto3.resource('s3', aws_access_key_id=aws_id, aws_secret_access_key=aws_key)

    def download_s3(self, key, bucket='bucket'):
        try:
            f = self.s3_client.meta.client.get_object(Bucket=bucket, Key=key)['Body'].read()
        except ClientError as e:
            print(f'Key {key} not found')
            return None
        return np.frombuffer(f)

    def download_s3_parallel(self, keys):

        print(f'executor called...')
        with futures.ThreadPoolExecutor(max_workers=1) as executor:
            print(f'launching threads')
            future_to_key = {executor.submit(self.download_s3, key): key for key in keys}
            print(future_to_key)
        
            print(f'looping through completed threads')
            # for future in futures.as_completed(future_to_key):
            #     key = future_to_key[future]
            #     exception = future.exception()

            #     if not exception:
            #         yield key, future.result()
            #     else:
            #         yield key, exception

The output becomes

executor called...
launching threads
{<Future at 0x7f5f8dd12430 state=running>: 'key1', <Future at 0x7f5f8dd12730 state=pending>: 'key2', <Future at 0x7f5f8dd2c8b0 state=pending>: 'key3'}
looping through completed threads

So my question is: Why is the for loop causing the entire download_features_s3_parallel function to not be run? Even the print statement at the beginning of that method does not get called. What is the reason this happens? If it's because the for loop is throwing an error, how can I find out what the error is?

anactualtoaster
  • 396
  • 3
  • 4
  • Insert a `print(key)` in the loop and see what happens. Why is the pool created with `max_workers=1`? BTW, given Cpython's limitations on threads and the type of application, I would use [asyncio](https://docs.python.org/3/library/asyncio.html) instead – Pynchia Aug 28 '20 at 03:45
  • I have added a print(key) in the for loop underneath the `key = future_to_key[future]` line and nothing changes. I have also changed the `num_workers` to 4, but none of these gave any output. Regarding asyncio, I was hoping to utilize what I found in the other SO post linked in my question. I'm guessing the issue is related to the fact that I have wrapped everything up in a python class? That seems to be the only real difference. – anactualtoaster Aug 28 '20 at 04:00

0 Answers0