I have a class called Downloader
which I would like to use to handle transactions from my S3. I want to use concurrency to download things from S3 as fast as possible, as per this SO post, but when I put all the concurrency code in my class, I get some strange behaviour.
Running the code below
import boto3
import botocore
from concurrent import futures
class Downloader:
def __init__(self, aws_key, aws_id):
self.s3_client = boto3.resource('s3', aws_access_key_id=aws_id, aws_secret_access_key=aws_key)
def download_s3(self, key, bucket='bucket'):
try:
f = self.s3_client.meta.client.get_object(Bucket=bucket, Key=key)['Body'].read()
except ClientError as e:
print(f'Key {key} not found')
return None
return np.frombuffer(f)
def download_s3_parallel(self, keys):
print(f'executor called...')
with futures.ThreadPoolExecutor(max_workers=1) as executor:
print(f'launching threads')
future_to_key = {executor.submit(self.download_s3, key): key for key in keys}
print(future_to_key)
print(f'looping through completed threads')
for future in futures.as_completed(future_to_key):
key = future_to_key[future]
exception = future.exception()
if not exception:
yield key, future.result()
else:
yield key, exception
keys = ['key1', 'key2', 'key3']
dl = Downloader(<my_secret_key>, <my_aws_id>)
all_objects = dl.download_s3_parallel(keys)
Gives no output, but when I comment out the for loop like so:
class Downloader:
def __init__(self, aws_key, aws_id):
self.s3_client = boto3.resource('s3', aws_access_key_id=aws_id, aws_secret_access_key=aws_key)
def download_s3(self, key, bucket='bucket'):
try:
f = self.s3_client.meta.client.get_object(Bucket=bucket, Key=key)['Body'].read()
except ClientError as e:
print(f'Key {key} not found')
return None
return np.frombuffer(f)
def download_s3_parallel(self, keys):
print(f'executor called...')
with futures.ThreadPoolExecutor(max_workers=1) as executor:
print(f'launching threads')
future_to_key = {executor.submit(self.download_s3, key): key for key in keys}
print(future_to_key)
print(f'looping through completed threads')
# for future in futures.as_completed(future_to_key):
# key = future_to_key[future]
# exception = future.exception()
# if not exception:
# yield key, future.result()
# else:
# yield key, exception
The output becomes
executor called...
launching threads
{<Future at 0x7f5f8dd12430 state=running>: 'key1', <Future at 0x7f5f8dd12730 state=pending>: 'key2', <Future at 0x7f5f8dd2c8b0 state=pending>: 'key3'}
looping through completed threads
So my question is: Why is the for loop causing the entire download_features_s3_parallel
function to not be run? Even the print statement at the beginning of that method does not get called. What is the reason this happens?
If it's because the for loop is throwing an error, how can I find out what the error is?