3

I am experimenting with multi-thread execution using the asyncio feature.
It is my understanding that loop manages threads and executes functions on threads.
If this is the case, then it should not be possible to launch a new function while the main thread is still processing.
However, when I run the following code, it starts executing a new function while I am asleep. Could you tell me the reason?

ref: https://stackoverflow.com/a/60747799

import asyncio
from concurrent.futures.thread import ThreadPoolExecutor
from time import sleep
import logging

logging.basicConfig(
    level=logging.DEBUG, format="%(asctime)s %(thread)s %(funcName)s %(message)s"
)

def long_task(t):
    """Simulate long IO bound task."""
    logging.info("2. t: %s", t)
    sleep(t)
    logging.info("5. t: %s", t)
    return t ** 2

async def main():
    loop = asyncio.get_running_loop()
    executor = ThreadPoolExecutor(max_workers=2)
    inputs = range(1, 5)
    logging.info("1.")
    futures = [loop.run_in_executor(executor, long_task, i) for i in inputs]
    logging.info("3.")
    sleep(3)
    logging.info("4.")
    results = await asyncio.gather(*futures)
    logging.info("6.")

if __name__ == "__main__":
    asyncio.run(main())

expected output

2022-02-08 22:59:08,896 139673219430208 __init__ Using selector: EpollSelector
2022-02-08 22:59:08,896 139673219430208 main 1.
2022-02-08 22:59:08,897 139673194632960 long_task 2. t: 1
2022-02-08 22:59:08,897 139673186240256 long_task 2. t: 2
2022-02-08 22:59:08,897 139673219430208 main 3.
2022-02-08 22:59:09,898 139673194632960 long_task 5. t: 1
2022-02-08 22:59:10,898 139673186240256 long_task 5. t: 2
2022-02-08 22:59:13,400 139673219430208 main 4.
2022-02-08 22:59:09,898 139673194632960 long_task 2. t: 3
2022-02-08 22:59:10,899 139673186240256 long_task 2. t: 4
2022-02-08 22:59:12,902 139673194632960 long_task 5. t: 3
2022-02-08 22:59:14,903 139673186240256 long_task 5. t: 4
2022-02-08 22:59:14,903 139673219430208 main 6.

actual output

2022-02-08 22:59:08,896 139673219430208 __init__ Using selector: EpollSelector
2022-02-08 22:59:08,896 139673219430208 main 1.
2022-02-08 22:59:08,897 139673194632960 long_task 2. t: 1
2022-02-08 22:59:08,897 139673186240256 long_task 2. t: 2
2022-02-08 22:59:08,897 139673219430208 main 3.
2022-02-08 22:59:09,898 139673194632960 long_task 5. t: 1
2022-02-08 22:59:09,898 139673194632960 long_task 2. t: 3
2022-02-08 22:59:10,898 139673186240256 long_task 5. t: 2
2022-02-08 22:59:10,899 139673186240256 long_task 2. t: 4
2022-02-08 22:59:12,902 139673194632960 long_task 5. t: 3
2022-02-08 22:59:13,400 139673219430208 main 4.
2022-02-08 22:59:14,903 139673186240256 long_task 5. t: 4
2022-02-08 22:59:14,903 139673219430208 main 6.

Between 3 and 4, sleep(3) is being executed in the main thread. I understand that the end of longtask(1) and longtask(2) running earlier in Threadpool is printed during this time, but why is the next task running during this time? If event_loop is in the main thread, then sleep(3) should not allow the execution of the new function.

user20533
  • 41
  • 1
  • 4

1 Answers1

2

When using run_in_executor in asyncio, is the event loop executed in the main thread?

Yes, it is - but run_in_executor submits the callables to an executor, allowing them to run without assistance from the event loop.

Between 3 and 4, sleep(3) is being executed in the main thread. I understand that the end of longtask(1) and longtask(2) running earlier in Threadpool is printed during this time, but why is the next task running during this time? If event_loop is in the main thread, then sleep(3) should not allow the execution of the new function.

ThreadPoolExecutor(max_workers=2) creates a thread pool that can scale to up to two workers. run_in_executor is a wrapper around Executor.submit that ensures that the final result is propagated to asyncio. Its implementation could look like this (the actual code is a bit more complex because it handles cancellation and other concerns, but this is the gist):

class EventLoop:
    # ...
    def run_in_executor(self, executor, f, *args):
        async_future = self.create_future()
        handle = executor.submit(f, *args)
        def when_done(_):
            self.call_soon_threadsafe(async_future.set_result, handle.result())
        handle.add_done_callback(when_done)
        return async_future

The call to submit pushes the callable and its arguments into a multi-threaded queue. The pool's workers run in an infinite loop that consumes that queue, exiting only when the executor is told to shut down.

If you submit more tasks than there are workers in the pool, the additional tasks will still be placed in the queue, waiting for their turn to be processed. (The queue is an unbounded channel, so Executor.submit() never blocks.) Once a worker is done with a task, it will request the next task off the queue, which is why your extra tasks get executed. It doesn't matter that the main thread is stuck in time.sleep() at that point - the functions were submitted to the executor prior to that, and are sitting in the queue, so the workers can get to them just fine.


Finally, in normal asyncio code, an async function must never call time.sleep(), it must await asyncio.sleep() instead. (I'm aware that you did it intentionally to block the thread running the event loop, but it something that beginners are often not aware of, so it needs to be pointed out.)

user4815162342
  • 141,790
  • 18
  • 296
  • 355