5

I am porting a simple python 3 script to AWS Lambda. The script is simple: it gathers information from a dozen of S3 objects and returns the results.

The script used multiprocessing.Pool to gather all the files in parallel. Though multiprocessing cannot be used in an AWS Lambda environment since /dev/shm is missing. So I thought instead of writing a dirty multiprocessing.Process / multiprocessing.Queue replacement, I would try asyncio instead.

I am using the latest version of aioboto3 (8.0.5) on Python 3.8.

My problem is that I cannot seem to gain any improvement between a naive sequential download of the files, and an asyncio event loop multiplexing the downloads.

Here are the two versions of my code.

import sys
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

import boto3
import aioboto3

BUCKET = 'some-bucket'
KEYS = [
    'some/key/1',
    [...]
    'some/key/10',
]

async def download_aio():
    """Concurrent download of all objects from S3"""
    async with aioboto3.client('s3') as s3:
        objects = [s3.get_object(Bucket=BUCKET, Key=k) for k in KEYS]
        objects = await asyncio.gather(*objects)
        buffers = await asyncio.gather(*[o['Body'].read() for o in objects])

def download():
    """Sequentially download all objects from S3"""
    s3 = boto3.client('s3')
    for key in KEYS:
        object = s3.get_object(Bucket=BUCKET, Key=key)
        object['Body'].read()

def run_sequential():
    download()

def run_concurrent():
    loop = asyncio.get_event_loop()
    #loop.set_default_executor(ProcessPoolExecutor(10))
    #loop.set_default_executor(ThreadPoolExecutor(10))
    loop.run_until_complete(download_aio())

The timing for both run_sequential() and run_concurrent() are quite similar (~3 seconds for a dozen of 10MB files). I am convinced the concurrent version is not, for multiple reasons:

  • I tried switching to Process/ThreadPoolExecutor, and I the processes/threads spawned for the duration of the function, though they are doing nothing
  • The timing between sequential and concurrent is very close to the same, though my network interface is definitely not saturated, and the CPU is not bound either
  • The time taken by the concurrent version increases linearly with the number of files.

I am sure something is missing, but I just can't wrap my head around what.

Any ideas?

NewbiZ
  • 2,395
  • 2
  • 26
  • 40

2 Answers2

5

After loosing some hours trying to understand how to use aioboto3 correctly, I decided to just switch to my backup solution. I ended up rolling my own naive version of multiprocessing.Pool for use within an AWS lambda environment.

If someone stumble across this thread in the future, here it is. It is far from perfect, but easy enough to replace multiprocessing.Pool as-is for my simple cases.

from multiprocessing import Process, Pipe
from multiprocessing.connection import wait


class Pool:
    """Naive implementation of a process pool with mp.Pool API.

    This is useful since multiprocessing.Pool uses a Queue in /dev/shm, which
    is not mounted in an AWS Lambda environment.
    """

    def __init__(self, process_count=1):
        assert process_count >= 1
        self.process_count = process_count

    @staticmethod
    def wrap_pipe(pipe, index, func):
        def wrapper(args):
            try:
                result = func(args)
            except Exception as exc:  # pylint: disable=broad-except
                result = exc
            pipe.send((index, result))
        return wrapper

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, exc_traceback):
        pass

    def map(self, function, arguments):
        pending = list(enumerate(arguments))
        running = []
        finished = [None] * len(pending)
        while pending or running:
            # Fill the running queue with new jobs
            while len(running) < self.process_count:
                if not pending:
                    break
                index, args = pending.pop(0)
                pipe_parent, pipe_child = Pipe(False)
                process = Process(
                    target=Pool.wrap_pipe(pipe_child, index, function),
                    args=(args, ))
                process.start()
                running.append((index, process, pipe_parent))
            # Wait for jobs to finish
            for pipe in wait(list(map(lambda t: t[2], running))):
                index, result = pipe.recv()
                # Remove the finished job from the running list
                running = list(filter(lambda x: x[0] != index, running))
                # Add the result to the finished list
                finished[index] = result

        return finished
NewbiZ
  • 2,395
  • 2
  • 26
  • 40
  • Thanks for posting your solution. Tried this approach, but I get this error: Can't pickle local object 'Pool.wrap_pipe..wrapper'. Any idea how to solve this? – Daniel TZ Nov 30 '22 at 15:09
4

it's 1.5 years later and aioboto3 is still not well documented or supported.

The multithreading option is good. but AIO is an easier and more clear implementation

I don't actually know what's wrong with your AIO code. It's even not running now because of the updates I guess. but using aiobotocore this code worked. my test was with 100 images. in the sequential code, it takes 8 sec. in average. in IO it was less than 2.

with 1000 images it was 17 sec.

import asyncio
from aiobotocore.session import get_session
async def download_aio(s3,bucket,file_name):
    o = await s3.get_object(Bucket=bucket, Key=file_name)
    x = await o['Body'].read()

async def run_concurrent(): 
    tasks =[]
    session = get_session()
    async with session.create_client('s3') as s3:
        for k in KEYS[:100]:   
            tasks.append(asyncio.ensure_future(get_object(s3,BUCKET,k)))
        await asyncio.gather(*tasks)