92

I'm using aiohttp to build an API server that sends TCP requests off to a seperate server. The module that sends the TCP requests is synchronous and a black box for my purposes. So my problem is that these requests are blocking the entire API. I need a way to wrap the module requests in an asynchronous coroutine that won't block the rest of the API.

So, just using sleep as a simple example, is there any way to somehow wrap time-consuming synchronous code in a non-blocking coroutine, something like this:

async def sleep_async(delay):
    # After calling sleep, loop should be released until sleep is done
    yield sleep(delay)
    return 'I slept asynchronously'
Zac Delventhal
  • 3,543
  • 3
  • 20
  • 26
  • 3
    You always block on I/O. With cooperative multitasking you can't get desired behaviour, because blocked coroutine returns control (yield) only after request is finished. – Dmitry Shilyaev Apr 05 '17 at 21:03
  • 2
    aiohttp is good for http. For non http TCP, asyncio is enough. – Udi Apr 05 '17 at 22:13

6 Answers6

101

Eventually I found an answer in this thread. The method I was looking for is run_in_executor. This allows a synchronous function to be run asynchronously without blocking an event loop.

In the sleep example I posted above, it might look like this:

import asyncio
from time import sleep

async def sleep_async(loop, delay):
    # None uses the default executor (ThreadPoolExecutor)
    await loop.run_in_executor(None, sleep, delay)
    return 'I slept asynchronously'

Also see the following answer -> How do we call a normal function where a coroutine is expected?

Nick Garvey
  • 2,980
  • 24
  • 31
Zac Delventhal
  • 3,543
  • 3
  • 20
  • 26
  • 28
    `ProcessPoolExecutor` has a high cost because it launches an entire new python interpreter. It is used when you have a CPU-intensive task that needs to use multiple processors. Consider using `ThreadPoolExecutor` instead, which uses threading. – Oleg Jun 11 '17 at 01:38
  • 7
    Thank you for the additional info. Although the original example used process pool, `ThreadPoolExecutor` is what I ended up using after a little more research. Still seems a little jenky, but so far it's all holding together. – Zac Delventhal Jun 14 '17 at 03:34
  • 14
    To get the event loop, one can do `loop = asyncio.get_event_loop()` –  Apr 05 '20 at 23:16
43

You can use a decorator to wrap the sync version to an async version.

import time
from functools import wraps, partial


def wrap(func):
    @wraps(func)
    async def run(*args, loop=None, executor=None, **kwargs):
        if loop is None:
            loop = asyncio.get_event_loop()
        pfunc = partial(func, *args, **kwargs)
        return await loop.run_in_executor(executor, pfunc)
    return run

@wrap
def sleep_async(delay):
    time.sleep(delay)
    return 'I slept asynchronously'

Outdated, aioify is maintenance mode

or use the aioify library

% pip install aioify

then

@aioify
def sleep_async(delay):
    pass
rominf
  • 2,719
  • 3
  • 21
  • 39
ospider
  • 9,334
  • 3
  • 46
  • 46
  • 1
    good advise to use `aioify` it makes now so easy to write async functions and modules :) – WBAR Feb 26 '19 at 23:46
  • Even when using `aiofy` when using the function, it may still block the event loop if the function itself is a long-running blocking operation. In that case, instead of concurrency we would need parallelism. Unless there was a way to yield back to the loop within the long-running operation. – CMCDragonkai Sep 30 '20 at 01:45
17

From python 3.9 the cleanest way to do this is to use asyncio.to_thread method, which is basically a shortcut for run_in_executor, but keeps all the contextvars.

Also, please consider GIL, since it is a to_thread. You still can run CPU-bound tasks for something like numpy. From the docs:

Note Due to the GIL, asyncio.to_thread() can typically only be used to make IO-bound functions non-blocking. However, for extension modules that release the GIL or alternative Python implementations that don’t have one, asyncio.to_thread() can also be used for CPU-bound functions.
Alexey Trofimov
  • 4,287
  • 1
  • 18
  • 27
8

Maybe someone will need my solution to this problem. I wrote my own library to solve this, which allows you to make any function asynchronous using a decorator.

To install the library, run this command:

$ pip install awaits

To make any of your functions asynchronous, just add the @awaitable decorator to it, like this:

import time
import asyncio
from awaits.awaitable import awaitable

@awaitable
def sum(a, b):
  # heavy load simulation
  time.sleep(10)
  return a + b

Now you can make sure that your function is really asynchronous coroutine:

print(asyncio.run(sum(2, 2)))

"Under the hood" your function will be executed in the thread pool. This thread pool will not be recreated every time your function is called. A thread pool is created once and accepts new tasks via a queue. This will make your program run faster than using other solutions, because the creation of additional threads is an additional overhead.

Evgeniy Blinov
  • 351
  • 3
  • 3
8

The decorator would be useful for this case and run your blocking function in another thread.

import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import wraps, partial
from typing import Union

class to_async:

    def __init__(self, *, executor: Optional[ThreadPoolExecutor]=None):
       
        self.executor =  executor
    
    def __call__(self, blocking):
        @wraps(blocking)
        async def wrapper(*args, **kwargs):

            loop = asyncio.get_event_loop()
            if not self.executor:
                self.executor = ThreadPoolExecutor()

            func = partial(blocking, *args, **kwargs)
        
            return await loop.run_in_executor(self.executor,func)

        return wrapper

@to_async(executor=None)
def sync(*args, **kwargs):
    print(args, kwargs)
   
asyncio.run(sync("hello", "world", result=True))

Mustafa Quraish
  • 692
  • 4
  • 10
Sabuhi Shukurov
  • 1,616
  • 16
  • 17
1

Not sure if too late but you can also use a decorator to do your function in a thread. ALTHOUGH, note that it will still be non-coop blocking unlike async which is co-op blocking.

def wrap(func):
    from concurrent.futures import ThreadPoolExecutor
    pool=ThreadPoolExecutor()
    @wraps(func)
    async def run(*args, loop=None, executor=None, **kwargs):
        if loop is None:
            loop = asyncio.get_event_loop()
        future=pool.submit(func, *args, **kwargs)
        return asyncio.wrap_future(future)
    return run
hpca01
  • 370
  • 4
  • 15