-1

When I use third party aiobotocore it works up to NUM_WORKERS=500 and If I want to go up to a 1000 I get this error:

    r, w, _ = self._select(self._readers, self._writers, [], timeout)
  File ".....\lib\selectors.py", line 314, in _select
    r, w, x = select.select(r, w, w, timeout)
ValueError: too many file descriptors in select()

If there a way to execute 1000 in parallel?

Source:


import os, sys, time, json
import asyncio
from itertools import chain
from typing import List
import logging
from functools import partial
from pprint import pprint 



# Third Party
import asyncpool
import aiobotocore.session
import aiobotocore.config

_NUM_WORKERS=500

async def execute_lambda( lambda_name: str, key: str, client):
    # Get json content from s3 object
    if 1:
        name=lambda_name
        response = await client.invoke(
            InvocationType='RequestResponse',
            FunctionName=name,
            LogType='Tail',
            Payload=json.dumps({
                'exec_id':key,
                })
            )
    out=[]
    async for event in response['Payload']:
        out.append(event.decode())

    #await asyncio.sleep(1)
    return out


async def submit(lambda_name: str) -> List[dict]:
    """
    Returns list of AWS Lambda outputs executed in parallel

    :param name: name of lambda function
    :return: list of lambda returns
    """
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger()

    session = aiobotocore.session.AioSession()
    config = aiobotocore.config.AioConfig(max_pool_connections=_NUM_WORKERS)
    contents = []
    #client = boto3.client('lambda', region_name='us-west-2')
    async with session.create_client('lambda', region_name='us-west-2', config=config) as client:
        worker_co = partial(execute_lambda, lambda_name)
        async with asyncpool.AsyncPool(None, _NUM_WORKERS, 'lambda_work_queue', logger, worker_co,
                                       return_futures=True, raise_on_join=True, log_every_n=10) as work_pool:
            for x in range(_NUM_WORKERS):
                contents.append(await work_pool.push(x, client))

    # retrieve results from futures
    contents = [c.result() for c in contents]
    return list(chain.from_iterable(contents))



def main(name, files):
    s = time.perf_counter()
    _loop = asyncio.get_event_loop()
    _result = _loop.run_until_complete(submit(name))
    pprint(_result)
    elapsed = time.perf_counter() - s
    print(f"{__file__} executed in {elapsed:0.2f} seconds.")

Lambda function:

import time
def lambda_handler(event, context):
    time.sleep(10)
    return {'code':0, 'exec_id':event['exec_id']}

Result:

 '{"code": 0, "exec_id": 0}',
 '{"code": 0, "exec_id": 1}',
 '{"code": 0, "exec_id": 2}',
 '{"code": 0, "exec_id": 3}',
...
 '{"code": 0, "exec_id": 496}',
 '{"code": 0, "exec_id": 497}',
 '{"code": 0, "exec_id": 498}',
 '{"code": 0, "exec_id": 499}']
my_cli_script.py executed in 14.56 seconds.
John Rotenstein
  • 241,921
  • 22
  • 380
  • 470
Alex B
  • 2,165
  • 2
  • 27
  • 37

2 Answers2

4

In response to the question asked in the comments here, here's the code I use to spin up 100 lambda instances in parallel:


import boto3
import json
from concurrent.futures import ThreadPoolExecutor

# AWS credentials are exported in my env variables
# so region and account-id are fetched from there
lambda_ = boto3.client('lambda')

def invoke_lambda(payload):
    payload = {'body': json.dumps(payload)}

    response = lambda_.invoke(
        FunctionName='my-func',
        # I need to receive a response back from lambda
        # so I use sync invocation
        InvocationType='RequestResponse',
        LogType='Tail',
        Payload=json.dumps(payload)
    )

    res_payload = response.get('Payload').read()
    body = json.loads(res_payload).get('body')
    
    return body


MAX_WORKERS = 100  # how many lambdas you want to spin up concurrently

with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    result = list(executor.map(invoke_lambda, data))
# data is a list of dicts, each element is a single "payload"

Two last notes:

  1. a dozen milliseconds to spawn 100 concurrent lambdas was probably an exaggeration. For some reason if I set a higher granularity in cloudwatch metrics it plots nothing so I can't say for sure how long it took exactly. To thread safely I would say within 2 seconds.
  2. this piece of code has only been run in my local environment so far. It's pretty vanilla so I don't see why it wouldn't work elsewhere (for example, another parent lambda), but as a word of warning, I haven't tested it online yet.
wtfzambo
  • 578
  • 1
  • 12
  • 21
0

found this post: python-asyncio-aiohttp-valueerror-too-many-file-descriptors-in-select-on-win

After change it started to work

# 1000 is a soft concurrency limit
_NUM_WORKERS=990  

def main(name, files):
    if sys.platform == 'win32':
        _loop = asyncio.ProactorEventLoop()
        asyncio.set_event_loop(_loop)
        _result = _loop.run_until_complete(submit(name))
    else:
        _loop = asyncio.get_event_loop()
        _result = _loop.run_until_complete(submit(name))
    process = psutil.Process(os.getpid())
    print(f"{__file__}: memory[{process.memory_info().rss/1024:7,.2f}], elapsed {elapsed:0.2f} sec")

Result:

...
 '{"code": 0, "exec_id": 986}',
 '{"code": 0, "exec_id": 987}',
 '{"code": 0, "exec_id": 988}',
 '{"code": 0, "exec_id": 989}']
my_cli_script.py: memory[201,064.00], elapsed 16.53 sec

enter image description here

Alex B
  • 2,165
  • 2
  • 27
  • 37
  • 1
    Yo, I was wondering, is the aiobotocore necessary to have parallel lambda invokations? Can't asyncio alone handle this? I'm essentially trying to do exactly what you're doing, but without using external packages besides what's already built-in in AWS lambda, but I can't seem to get there. My executions are still sequential. – wtfzambo Aug 11 '21 at 16:45
  • Engineers say it's a bad design to use Lambda like this. EKS is the way to go. As to your question - i'm not sure why are you limiting- if you run this code in lambda - you can always cook third party stuff into a layer. If i's ran on EC2 i do not see why you cannot use third party libs – Alex B Aug 12 '21 at 17:44
  • 1
    no specific reason, I am currently using ECR for my dependencies, and I'm trying to keep the image size as small as I can (it's already quite large cause I'm using pytorch). Eventually I managed to achieve what I wanted using `concurrent.futures`, so no need for aiobotocore. Never used EKS, what about it? The reason I went for lambda is because I like the easy of using serverless architectures (with sls framework, for example) and pay-per-use pricing. Since the model I'm deploying will have very sparse usage, I don't want to pay for a constantly running server. – wtfzambo Aug 16 '21 at 13:43
  • with `concurrent.futures` you cannot do it using single s3client connection. – Alex B Aug 22 '21 at 16:00
  • pardon me but, what's s3 connection got to do with this? I managed to use `concurrent.futures` to spin up hundred of concurrent lambda instances using a single `client` object (`client = boto3.client('lambda'`). I'm really not sure what you meant with your sentence. – wtfzambo Aug 23 '21 at 18:08
  • but `concurrent.futures` use multiprocessing. you will not be able to use the same s3 client in multiple processes – Alex B Aug 23 '21 at 18:15
  • if you use `concurrent.futures.ThreadPoolExecutor` class it can also use threading! The multiprocessing one is only if you use `ProcessPoolExecutor`. See: https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor – wtfzambo Aug 23 '21 at 18:18
  • there's no way you can spawn 1000 threads in the same time frame as asyncpool does it – Alex B Aug 23 '21 at 18:19
  • I didn't spin up a 1000 but I did spin up a 100 in the span of a dozen milliseconds. Admittedly, I would need to do a proper comparisoon with the same numbers between the two packages to know which one performs better. In my case, I didn't need 1k lambdas at once (not yet at least) so I went with the easiest / least code to write solution. – wtfzambo Aug 23 '21 at 18:22
  • yeah sure, I'll add it as an answer to your question so that it's more readable. – wtfzambo Aug 24 '21 at 16:30