0

I have been looking for an equivalent in Python to JavaScript's await Promise.all() functionality, which led me to asyncio.gather(). After having read a few explanations and followed a few examples, I haven't managed to get anything working asynchronously.

The task is straightforward: extract values remotely from multiple files from S3, then collect the results when all are finished. I have done this in JS and it takes little over a second to read from 12 files.

The code is written for FastAPI, and a simplified form of it is below. The reason I know that this is not working asynchronously is that the more files in s3 it reads from, the longer this takes.

I have seen documentation for this kind of thing, but as it is not working for me I am not sure if I am doing something wrong or this just wont work in my use case. I am worried that streaming from a remote file using rasterio just doesnt work in this case.

How can I change the code below so that it calls the functions concurrently and collects all the responses below when they are all completed? I haven't used this feature in python before, so just need a little more clarification.

async def read_from_file(s3_path):
    # The important thing to note here is that it
    #  is streaming from a file in s3 given an s3 path
    with rasterio.open(s3_path) as src:
        values = src.read(1, window=Window(1, 2, 1, 1))
        return values[0][0]

@app.get("/get-all")
async def get_all():
    start_time = datetime.datetime.now()
    # example paths
    s3_paths = [
        "s3:file-1",
        "s3:file-2",
        "s3:file-3",
        "s3:file-4",
        "s3:file-5",
        "s3:file-6",
    ]

    values = await asyncio.gather(
        read_from_file(s3_paths[0]),
        read_from_file(s3_paths[1]),
        read_from_file(s3_paths[2]),
        read_from_file(s3_paths[3]),
        read_from_file(s3_paths[4]),
        read_from_file(s3_paths[5]),
    )

    end_time = datetime.datetime.now()
    logger.info(f"duration: {end_time-start_time}")
  • 3
    To take advantage or async flows, the function can’t block the thread. It look like `rasterio.open/read` are not async functions. This means it doesn’t release control back to the event loop while doing the slow work of reading. – Mark Oct 04 '22 at 13:45
  • ah ok, thanks very much. I will go down the multithreading route then. – sobmortin354 Oct 04 '22 at 14:49
  • See https://github.com/aio-libs/aiobotocore for an async compatible s3 client. – MatsLindh Oct 04 '22 at 19:16
  • No need to write the multi-threaded code yourself though - you can just make the rasterio calls run in separate threads by using the asyncio loop ' `. run_in_executor` method: it will do the trick for you with almost no need to change your code. (although I don't think this works fine with a `with` block: just resort to call `.open` and `close` in flat code. – jsbueno Oct 04 '22 at 22:03

1 Answers1

0

Python asyncio has a mechanism to run the non-async code, like the calls to the rasterio lib, in other threads, so that the async loop is not blocked.

Try this code:

import asyncio
from functools import partial

async def read_from_file(s3_path):
    # The important thing to note here is that it
    #  is streaming from a file in s3 given an s3 path
    loop = asyncio.get_running_loop()
    try: 
        src = await loop.run_in_executor(None, rasterio.open, s3_path)
        values = await loop.run_in_executor(None, partial(src.read, 1, window=Window(1, 2, 1, 1))
    finally:
        src.close()  # might be interesting to paralelize this as well
    return values[0][0]

If it needs to be faster, you can create a custom executor: the default one will only use n_cpu threads, I think, and might slow things down when the bottleneck is the network latency - some point around 20 threads might be interesting. (This executor should be either a global resource, or passed as parameter to your read_from_file, and is a plain concurrent.futures.ThreadpoolPoolExecutor (https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor)

As for the run_in_executor, check https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor

jsbueno
  • 99,910
  • 10
  • 151
  • 209