4

I have the following code which read data from database (read_db) and write the data to parquet file (data.to_parquet). Both I/O operations take a while to run.

def main():
    while id < 1000:
       logging.info(f'reading - id: {id}')
       data = read_db(id) # returns a dataframe

       logging.info(f'saving - id: {id}')
       data.to_parquet(f'{id}.parquet')
       logging.info(f'saved - id: {id}')

       id += 1
       

It's slow so I want read_db(n+1) and to_parquet(n) running concurrently. I need to keep each step of id finishing sequentially though (read_db(n+1) need to run after read_db(n) and data.to_parquet(n+1) run after data.to_parquet(n).). Here is the asynchronous version

def async_wrap(f):
    @wraps(f)
    async def run(*args, loop=None, executor=None, **kwargs):
        if loop is None:
            loop = asyncio.get_event_loop()
        p = partial(f, *args, **kwargs)
        return await loop.run_in_executor(executor, p)
    return run

async def main():
    read_db_async = async_wrap(read_db)
    while id < 1000:
       logging.info(f'reading - id: {id}')
       data = await read_db_async(id) # returns a dataframe

       logging.info(f'saving - id: {id}')
       to_parquet_async = async_wrap(data.to_parquet)
       await data.to_parquet(f'{id}.parquet')
       logging.info(f'saved - id: {id}')

       id += 1

asyncio.get_event_loop().run_until_complete(main())

I excepted to see the some out of order of logs:

reading - id: 1
saving - id: 1      (saving 1 and reading 2 run in parallel)
reading - id: 2
saved - id: 1
saving - id: 2
reading - id: 3
saved - id: 2
.....

But, the actually logs are the same of synchronous code?

reading - id: 1
saving - id: 1
saved - id: 1
reading - id: 2
saving - id: 2
saved - id: 2
reading - id: 3
.....
ca9163d9
  • 27,283
  • 64
  • 210
  • 413
  • Does this answer your question? [How to run tasks concurrently in asyncio?](https://stackoverflow.com/questions/54156503/how-to-run-tasks-concurrently-in-asyncio) – mkrieger1 Jan 27 '21 at 00:19
  • No, the answer runs all task in parallel, which is what I need to avoid. I just need some steps run in parallel. – ca9163d9 Jan 27 '21 at 00:55
  • You can run as many tasks in parallel as you like. It doesn't have to be all of them. – mkrieger1 Jan 27 '21 at 01:41
  • There's only one coroutine running in your solution, the one that started with `main()`. The `await data.to_parquet(f'{id}.parquet')` means the current coroutine will _sleep_ until `to_parquet` finishes, so it won't start the next iteration before that. Check [this question](https://stackoverflow.com/questions/50757497/simplest-async-await-example-possible-in-python) for a basic example – naicolas Jan 27 '21 at 02:00
  • EDIT: I said "There's only one coroutine running" but that's not exactly accurate, since async calls create new coroutines. The thing is you are waiting for those new coroutines to finish to resume the original one. – naicolas Jan 27 '21 at 02:13

1 Answers1

2

You could make read_db(n+1) and to_parquet(n) run concurrently by using gather or equivalent:

async def main():
    read_db_async = async_wrap(read_db)
    prev_to_parquet = asyncio.sleep(0)  # no-op

    for id in range(1, 1000):
        data, _ = await asyncio.gather(read_db_async(id), prev_to_parquet)
        to_parquet_async = async_wrap(data.to_parquet)
        prev_to_parquet = to_parquet_async(f'{id}.parquet')

    await prev_to_parquet
user4815162342
  • 141,790
  • 18
  • 296
  • 355
  • I also need `read_db(n+1)` runs after `read_db(n)` and `data.to_parquet(n+1)` runs after `data.to_parquet(n)`. – ca9163d9 Jan 27 '21 at 14:18
  • Very nice. Actually, multiple `read_db(..)` can be run in parallel. It takes most of the time. However, I cannot have more than 3 `read_db()` because each one takes a lot of memories and there will not be enough memory if read too many data in dataframes. – ca9163d9 Jan 27 '21 at 14:44
  • @ca9163d9 Sure, you can use the same principle to parallelize any number of operations. You can even use the previous version of the answer and add a `Semaphore(3)` to prevent too many parallel reads. I'lll leave this version because it faithfully answers the question as asked (after the edit), and is simple enough to serve as basis for more sophisticated use cases. – user4815162342 Jan 27 '21 at 15:00
  • I created a new question https://stackoverflow.com/questions/65922160/read-in-parallel-and-write-sequentially. – ca9163d9 Jan 27 '21 at 15:23