14

I am trying to download 12,000 files from s3 bucket using jupyter notebook, which is estimating to complete download in 21 hours. This is because each file is downloaded one at a time. Can we do multiple downloads parallel to each other so I can speed up the process?

Currently, I am using the following code to download all files

### Get unique full-resolution image basenames
images = df['full_resolution_image_basename'].unique()
print(f'No. of unique full-resolution images: {len(images)}')

### Create a folder for full-resolution images
images_dir = './images/'
os.makedirs(images_dir, exist_ok=True)

### Download images
images_str = "','".join(images)
limiting_clause = f"CONTAINS(ARRAY['{images_str}'], 
full_resolution_image_basename)"
_ = download_full_resolution_images(images_dir, 
limiting_clause=limiting_clause)
Jothi
  • 143
  • 1
  • 1
  • 8
  • 2
    Can you call out to the OS and call the [AWS Command-Line Interface (CLI)](http://aws.amazon.com/cli/)? It has an `aws s3 cp` command that can copy files in parallel. Also useful is `aws s3 sync`, which is good for recovering from failed copies. – John Rotenstein Mar 10 '18 at 07:07

3 Answers3

26

See the code below. This will only work with python 3.6+, because of the f-string (PEP 498). Use a different method of string formatting for older versions of python.

Provide the relative_path, bucket_name and s3_object_keys. In addition, max_workers is optional, and if not provided the number will be a multiple of 5 times the number of machine processors.

Most of the code for this answer came from an answer to How to create an async generator in Python? which sources from this example documented in the library.

import boto3
import os
from concurrent import futures


relative_path = './images'
bucket_name = 'bucket_name'
s3_object_keys = [] # List of S3 object keys
max_workers = 5

abs_path = os.path.abspath(relative_path)
s3 = boto3.client('s3')

def fetch(key):
    file = f'{abs_path}/{key}'
    os.makedirs(file, exist_ok=True)  
    with open(file, 'wb') as data:
        s3.download_fileobj(bucket_name, key, data)
    return file


def fetch_all(keys):

    with futures.ThreadPoolExecutor(max_workers=5) as executor:
        future_to_key = {executor.submit(fetch, key): key for key in keys}

        print("All URLs submitted.")

        for future in futures.as_completed(future_to_key):

            key = future_to_key[future]
            exception = future.exception()

            if not exception:
                yield key, future.result()
            else:
                yield key, exception


for key, result in fetch_all(S3_OBJECT_KEYS):
    print(f'key: {key}  result: {result}')
Diego Goding
  • 516
  • 4
  • 6
  • 1
    `fetch` should create a new session as per: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html#multithreading-multiprocessing – Damon Maria Aug 29 '18 at 16:01
  • 1
    I agree, but I don't think that is the case with the `client` interface, so I adjusted the answer to use the `client` interface, which `resource` interface just wraps around. I try to avoid creating `sessions`, `clients`, and `resources`, because those actions are costly. – Diego Goding Sep 02 '18 at 15:59
  • 1
    I ended up using a `threading.local()` to cache the session so it was reused by the jobs in the executor. – Damon Maria Sep 03 '18 at 16:07
  • @DamonMaria Can you explain where you included this? – Austin Dec 14 '20 at 22:30
  • @Austin I can't find where I used that code now. Do you mean where the `session` or the `local` was included? – Damon Maria Dec 15 '20 at 23:52
  • @DamonMaria the local – Austin Dec 17 '20 at 04:20
  • Create a `local` object at your module top level: `cache = threading.local()`. Then create the session if need be before using it: `if not hasattr(cache 'session'): cache.session = ` – Damon Maria Dec 20 '20 at 08:52
4

Thank you for this. Had 9000 over JPEG images that I needed to download from my S3. I tried to incorporate this directly into my Colab Pro but wasn't able to get it to work. Kept getting "Errno 21 : Is a directory" error.

Had to add 2 things: 1) a makedir to create the directory I want & 2) use mknod, instead of mkdir.

fetch_all is almost the same: except a small edit for max_workers to actually take effect. s3c is just my boto3.client with my keys and all.

My download time went from 30+ mins to 5 mins with 1000 workers.

os.makedirs('/*some dir you want*/*prefix*')

def fetch(key):
    file = f'{abs_path}/{key}'
    os.mknod(file, mode=384)  
    with open(file, 'wb') as data:
        s3c.download_fileobj(bucket_name, key, data)
    return file

def fetch_all(keys):

    with futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_key = {executor.submit(fetch, key): key for key in keys}

        print("All URLs submitted.")

        for future in futures.as_completed(future_to_key):

            key = future_to_key[future]
            exception = future.exception()

            if not exception:
                yield key, future.result()
            else:
                yield key, exception
I am TC
  • 41
  • 3
0

You can try this out. This is fast

import boto3
from multiprocessing import Pool

bucket_name = 'BUCKET_NAME'
prefix = 'PREFIX'
local_dir = './downloads/' # PUT YOUR LOCAL DIR 
max_process = 20 # CAN BE CHANGE
debug_en = True

# pass your credentials and region name
s3_client = boto3.client('s3',aws_access_key_id=' ',
                     aws_secret_access_key=' ', region_name=' ')


def downfiles(bucket_name, src_obj, dest_path):

    try:
        s3_client.download_file(bucket_name, src_obj, dest_path)
        if debug_en:
            print("[dubug] downloading object: %s to %s" %(src_obj, dest_path))
    except:
        pass


def download_dir(bucket_name, sub_prefix):
    paginator = s3_client.get_paginator('list_objects_v2')
    pages = paginator.paginate(Bucket=bucket_name, Prefix=sub_prefix)
    pool = Pool(max_process)
    print(pool)
    mp_data = []
    for page in pages:
        if 'Contents' in page:
            for obj in page['Contents']:
                src_obj = obj['Key']
                dest_path = local_dir + src_obj
                mp_data.append((bucket_name, src_obj, dest_path))
                os.path.dirname(dest_path) and os.makedirs(os.path.dirname(dest_path), exist_ok=True) 
    pool.starmap(downfiles, mp_data)
    return len(mp_data)

if __name__ == '__main__':
    print("starting script...")
    start_time = datetime.now()
    s3_dirs = prefix
    total_files = 0
    for s3_dir in s3_dirs:
        print("[Information] %s directory is downloading" % s3_dir)
        no_files = download_dir(bucket_name, s3_dir)
        total_files = total_files + no_files

    end_time = datetime.now()
    print('Duration: {}'.format(end_time - start_time))
    print('Total File numbers: %d' % total_files)
    print("ended")