3

I try to use run_in_executor and have some questions. Here is code (basically copypast from docs)

import asyncio
import concurrent.futures


def cpu_bound(val):
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    print(f'Start task: {val}')
    sum(i * i for i in range(10 ** 7))
    print(f'End task: {val}')


async def async_task(val):
    print(f'Start async task: {val}')
    while True:
        print(f'Tick: {val}')
        await asyncio.sleep(1)


async def main():
    loop = asyncio.get_running_loop()

    ## Options:

    for i in range(5):
        loop.create_task(async_task(i))

    # 1. Run in the default loop's executor:
    # for i in range(10):
    #     loop.run_in_executor(
    #         None, cpu_bound, i)
    # print('default thread pool')

    # 2. Run in a custom thread pool:
    # with concurrent.futures.ThreadPoolExecutor(max_workers=10) as pool:
    #     for i in range(10):
    #         loop.run_in_executor(
    #             pool, cpu_bound, i)
    #     print('custom thread pool')

    # 3. Run in a custom process pool:
    with concurrent.futures.ProcessPoolExecutor(max_workers = 10) as pool:
        for i in range(10):
            loop.run_in_executor(
                pool, cpu_bound, i)
        print('custom process pool')

    while True:
        await asyncio.sleep(1)


asyncio.run(main())

Case 1: run_in_executor where executor is None: async_task's execute in the same time as cpu_bound's execute.

In other cases async_task's will execute after cpu_bound's are done. I thought when we use ProcessPoolExecutor tasks shouldn't block loop. Where am I wrong?

mkrieger1
  • 19,194
  • 5
  • 54
  • 65
Mike
  • 860
  • 14
  • 24

1 Answers1

0

In other cases async_task's will execute after cpu_bound's are done. I thought when we use ProcessPoolExecutor tasks shouldn't block loop. Where am I wrong?

The problem is that with XXXPoolExecutor() shuts down the pool at the end of the with block. Pool shutdown waits for the pending tasks to finish, which blocks the event loop and is incompatible with asyncio. Since your first variant doesn't involve a with statement, it doesn't have this issue.

The solution is simply to remove the with statement and create the pool once (for example at top-level or in main()), and just use it in the function. If you want to, you can explicitly shut down the pool by calling pool.shutdown() after asyncio.run() has completed.

Also note that you are never awaiting the futures returned by loop.run_in_executor. This is an error and asyncio will probably warn you of it; you should probably collect the returned values in a list and await them with something like results = await asyncio.gather(*tasks). This will not only collect the results, but also make sure that the exceptions that occur in the off-thread functions get correctly propagated to your code rather than dropped.

user4815162342
  • 141,790
  • 18
  • 296
  • 355
  • I'm using asyncio to run requests.get() in a loop. Is it ok not to wait for the results of the loop.run_in_executor ? – r2b2 Apr 20 '21 at 00:56
  • 2
    @r2b2 This answer argues tht you should wait for them *somewhere*, so you collect the exceptions if nothing else. Not awaiting is probably not a fatal flaw, see if you get any warnings from asyncio. Also, if you really don't care about the result or exceptions, perhaps you don't need `run_in_executor()` in the first place, you could just call `executor.submit()`. The whole point of `run_in_executor()` is to convert a `concurrent.futures.Future` (returned by `executor.submit`) to an awaitable `asyncio.Future`. If you don't need that functionality, you can use the executor directly. – user4815162342 Apr 20 '21 at 06:48