3

I have a large (1M) db resultset for which I want to call a REST API for each row.

The API can accept batch requests but I am not sure how to slice the rows generator so that each task processes a list of rows, say 10. I rather not read all rows upfront and stick to a generator.

Accommodating my_function to send a list in one http request is easy enough but what about asyncio.gather ? Maybe one of the itertools can help.

See the generic pseudo-code below to illustrate:

async def main(rows):
    async with aiohttp.ClientSession() as session:
        tasks = [my_function(row, session) for row in rows]
        return await asyncio.gather(*tasks)

rows = <generator of database rows>
results = asyncio.run(main(rows))

Note: the results are small, basically a acknowledgement value for each row.

On a side note,

  • is there a limit to the number of tasks asyncio.gather() can handle (efficiently) ?
  • currently gather() loads all requests/tasks in memory, consuming 50GB (!). How can the rows and tasks be read and passed on-the-go to reduce memory usage ? Is this what asyncio.BoundedSemaphore() is used for ?
  • The TCP connections limit is 500, as the REST web server can accept that much. If semaphore comes into play, what should the value be i.e. does it make sense to set semaphore > TCPconnections limit ?

aiohttp and asyncio are great but difficult to follow - I agree with this post:

asyncio keeps changing all the time, so be wary of old Stack Overflow answers. Many of them are not up to date with the current best practices

EDIT:

I just tried using a asyncio.BoundedSemaphore(100) and memory usage is about the same (45GB) - not sure it has any benefit over connections limit

yan-hic
  • 1,374
  • 11
  • 26

2 Answers2

8

Semaphore-based solutios won't help with memory usage of a huge number of tasks because you'll still be creating all the coroutines and tasks in advance. All the coroutines will start executing, only for most of them to be immediately suspended until the semaphore lets them proceed.

Instead, you can create a fixed number of workers and feed them database rows through a queue:

async def worker(queue, session, results):
    while True:
        row = await queue.get()
        results.append(await my_function(row, session))
        # Mark the item as processed, allowing queue.join() to keep
        # track of remaining work and know when everything is done.
        queue.task_done()

async def main(rows):
    N_WORKERS = 50
    queue = asyncio.Queue(N_WORKERS)
    results = []
    async with aiohttp.ClientSession() as session:
        # create 50 workers and feed them tasks
        workers = [asyncio.create_task(worker(queue, session, results))
                   for _ in range(N_WORKERS)]
        # Feed the database rows to the workers. The fixed-capacity of the
        # queue ensures that we never hold all rows in the memory at the
        # same time. (When the queue reaches full capacity, this will block
        # until a worker dequeues an item.)
        async for row in rows:
            await queue.put(row)
        # Wait for all enqueued items to be processed.
        await queue.join()
    # The workers are now idly waiting for the next queue item and we
    # no longer need them.
    for worker in workers:
        worker.cancel()
    return results

Note that rows should be an async generator. If it's an ordinary generator, it will probably block the event loop and become the bottleneck. If your database doesn't support an async interface, see this answer for a way to convert a blocking generator to async by running it in a dedicated thread.

To batch items, you can build an intermediate list and dispatch it. Or you can use the excellent aiostream library which comes with the chunks operator that does just that:

async with aiostream.stream.chunks(rows, 10).stream() as chunks:
    async for batch in chunks:
         await queue.put(batch)  # enqueue a batch of 10 rows
user4815162342
  • 141,790
  • 18
  • 296
  • 355
  • Thanks but didn't work - I added `return await asyncio.gather(*workers)` at the end and got `concurrent.futures._base.CancelledError`. Note `rows` is not async (is `read_rows()` from https://googleapis.dev/python/bigquerystorage/latest/gapic/v1/api.html) so I wrapped like so: `async for row in async_wrap_iter(rows):` – yan-hic Jun 16 '20 at 14:40
  • @YannickEinsweiler Can you please specify in what way your initial attempt didn't work? If you await the workers after canceling them, you should pass `return_exceptions=True` to `gather`. As the workers are canceled, awaiting them will raise `CancelledError`. – user4815162342 Jun 16 '20 at 22:35
  • @YannickEinsweiler Workers don't have a meaningful return value, they just do their thing until they're told not to. If you need to collect results, you can pass each worker a list to add results to. (They can all append to the same list.) I understood the requirement to be not to hold all rows in memory, but to process them in a streaming fashion. Also, if you're setting the TCPConnector limit to 500, you probably want to raise the number of workers to the same number. – user4815162342 Jun 17 '20 at 12:22
  • @YannickEinsweiler I've now amended the answer so that workers collect results into a common list. – user4815162342 Jun 17 '20 at 14:21
  • 1
    Wow - scripts uses 0.5 CPU and now only 2GB ram instead of 50GB ! kudo kudo Hrvoje, really – yan-hic Jun 17 '20 at 21:02
  • I still don't understand why "rows" cannot be regular generator. Generator yields control back to main execution flow while keeping its inner state, so it doesn't block loop from executing tasks. Also in first versions of asyncio coroutines were based on generators (but this is not relevant rn, just small trivia) – aiven Jun 23 '21 at 11:38
  • 1
    @aiven A generator only yields control back when it has a value to provide. But while it's _waiting_ for a value, it blocks. (For example, `for line in sys.stdin:` will block while waiting for the next line on the standard input.) An async generator effectively has two modes of suspension: one to the event loop, while it waits for the value to arrive, and other to the "caller", when it yields a value. – user4815162342 Jun 23 '21 at 11:52
0

Many thanks to @user4815162342 for pointing in the right direction.

Here is a full working example, implementing batching, connections limit and queueing providing you get an async generator to start with. UPDATE: if you don't start with an async generator, see previous answer for a sync-to-async converter.

TCP_CONNECTIONS = 400  

# as per previous comment, match with connections so that each worker feeds one connection  
N_WORKERS = 400  
BATCH_SIZE = 10


async def my_function(row, session):
    async with session.post(my_url,
                            json=json.dumps(row, default=str)) as response:
        return await response.json()


async def worker(queue, session, results):
    while True:
        row = await queue.get()
        results.append(await my_function(row, session))
        queue.task_done()


async def main(rows):
    results = []  # better here than global
    queue = asyncio.Queue(N_WORKERS)

    async with aiohttp.ClientSession(
            connector=aiohttp.TCPConnector(limit=TCP_CONNECTIONS)) as session:

        workers = [asyncio.create_task(worker(queue, session, results))
                   for _ in range(N_WORKERS)]

        async with aiostream.stream.chunks(rows, BATCH_SIZE).stream() as chunks:
            async for batch in chunks:
                await queue.put(batch)

        await queue.join()

    for w in workers:
        w.cancel()

    return results


results = asyncio.run(main(rows))

If you don't need queueing and all tasks can hold in memory, you can instead use:

def chunks(iterator, n):
    return (chain([first], islice(iterator, 0, n - 1))
            for first in iterator)


async def main(rows):
    async with aiohttp.ClientSession(
            connector=aiohttp.TCPConnector(limit=TCP_CONNECTIONS)) as session:

        batches = [my_function(list(batch)) for batch in chunks(rows, BATCH_SIZE)]

    return await asyncio.gather(*batches)


results = asyncio.run(main(rows))
yan-hic
  • 1,374
  • 11
  • 26
  • Note that the linked answer was now fixed to avoid accumulating everything in the queue when the processing happens to be slower than the generation. – user4815162342 Jun 17 '20 at 14:23
  • Also, you probably want `N_WORKERS` and `TCP_CONNECTIONS` to be the same number. The nice thing about these workers is that they are not threads, they're coroutines, smallish Python objects which it's totally fine to have as many of as you need. – user4815162342 Jun 17 '20 at 14:27
  • Oh cool ! I'll try in a few + update answer. In the meantime, I found a nice workaround, namely dumping the rows into file and then use `aiofiles` in `main()` – yan-hic Jun 17 '20 at 16:11
  • And since you don't need the return value of the worker (as it's meaningless), the `gather` line achieves nothing and can likely be removed. `await queue.join()` is where we actually wait for all the results to arrive and where the workers do their jobs. – user4815162342 Jun 17 '20 at 16:26