I have a large (1M) db resultset for which I want to call a REST API for each row.
The API can accept batch requests but I am not sure how to slice the rows
generator so that each task processes a list of rows, say 10. I rather not read all rows upfront and stick to a generator.
Accommodating my_function
to send a list in one http request is easy enough but what about asyncio.gather
? Maybe one of the itertools
can help.
See the generic pseudo-code below to illustrate:
async def main(rows):
async with aiohttp.ClientSession() as session:
tasks = [my_function(row, session) for row in rows]
return await asyncio.gather(*tasks)
rows = <generator of database rows>
results = asyncio.run(main(rows))
Note: the results
are small, basically a acknowledgement value for each row.
On a side note,
- is there a limit to the number of tasks
asyncio.gather()
can handle (efficiently) ? - currently
gather()
loads all requests/tasks in memory, consuming 50GB (!). How can the rows and tasks be read and passed on-the-go to reduce memory usage ? Is this whatasyncio.BoundedSemaphore()
is used for ? - The TCP connections limit is 500, as the REST web server can accept that much. If semaphore comes into play, what should the value be i.e. does it make sense to set semaphore > TCPconnections limit ?
aiohttp
and asyncio
are great but difficult to follow - I agree with this post:
asyncio keeps changing all the time, so be wary of old Stack Overflow answers. Many of them are not up to date with the current best practices
EDIT:
I just tried using a asyncio.BoundedSemaphore(100)
and memory usage is about the same (45GB) - not sure it has any benefit over connections limit