70

So I'm locked to a python 3.6.2 interpreter that follows my desktop application.

What I want is to call an async function from a synchronized method or function.

When calling the python function from the desktop application it has to be a normal function which can not be awaited.

From the desktop application I am able to send a list of urls, and what I want is to send back response from every url in an async matter.

here is my try I've marked the SyntaxError which I don't know how to bypass.

import fmeobjects
import asyncio
import aiohttp
import async_timeout
logger = fmeobjects.FMELogFile()
timeout = 10

class FeatureProcessor(object):
    def __init__(self):
        pass
    def input(self, feature):
        urls_and_coords = zip(feature.getAttribute('_list{}._wms'),\
        feature.getAttribute('_list{}._xmin'),\
        feature.getAttribute('_list{}._ymin'),\
        feature.getAttribute('_list{}._xmax'),\
        feature.getAttribute('_list{}._ymax'))
        -> SyntaxError: newfeature = await main(urls_and_coords)
        self.pyoutput(newfeature)
        
    def close(self):
       pass 

async def main(urls):
    loop = asyncio.get_event_loop()
    async with aiohttp.ClientSession(loop=loop) as session:
        feature = loop.run_until_complete(fetch_all(session, urls, loop))
        return feature
        
async def fetch_all(session, urls, loop):
    results = await asyncio.gather(*[loop.create_task(fetch(session, url)) for url in urls])
    return results
    

async def fetch(session, url):
    with async_timeout.timeout(10):
        async with session.get(url[0]) as response:
            newFeature = fmeobjects.FMEFeature()
            response_data = await response
            newFeature.setAttribute('response', response_data)
            newFeature.setAttribute('_xmin',url[1])
            newFeature.setAttribute('_xmax',url[2])
            newFeature.setAttribute('_ymin',url[3])
            newFeature.setAttribute('_ymax',url[4])
            return newFeature

I have tried making these changes:

import fme
import fmeobjects
import asyncio
import aiohttp
import async_timeout
logger = fmeobjects.FMELogFile()

class FeatureProcessor(object):
    def __init__(self):
        pass
    def input(self, feature):
        urls_and_coords = zip(feature.getAttribute('_list{}._wms'),\
        feature.getAttribute('_list{}._xmin'),\
        feature.getAttribute('_list{}._ymin'),\
        feature.getAttribute('_list{}._xmax'),\
        feature.getAttribute('_list{}._ymax'))
        loop = asyncio.get_event_loop()
        result = loop.run_until_complete(main(loop, urls_and_coords))
        #feature.setAttribute('result',result)
        self.pyoutput(feature)
        
    def close(self):
       pass 

async def main(loop, urls):
    async with aiohttp.ClientSession(loop=loop) as session:
        return await fetch_all(session, urls, loop)

        
async def fetch_all(session, urls, loop):
    results = await asyncio.gather(*[loop.create_task(fetch(session, url)) for url in urls])
    return results
    

async def fetch(session, url):
    with async_timeout.timeout(10):
        async with session.get(url[0]) as response:
            #newFeature = fmeobjects.FMEFeature()
            response = await response
            #newFeature.setAttribute('response', response_data)
            #newFeature.setAttribute('_xmin',url[1])
            #newFeature.setAttribute('_xmax',url[2])
            #newFeature.setAttribute('_ymin',url[3])
            #newFeature.setAttribute('_ymax',url[4])
            return response, url[1], url[2], url[3], url[4]


        

but now I end up with this error:

Python Exception <TypeError>: object ClientResponse can't be used in 'await' 
expression
Traceback (most recent call last):
  File "<string>", line 20, in input
  File "asyncio\base_events.py", line 467, in run_until_complete
  File "<string>", line 29, in main
  File "<string>", line 33, in fetch_all
  File "<string>", line 41, in fetch
TypeError: object ClientResponse can't be used in 'await' expression
Carl Manaster
  • 39,912
  • 17
  • 102
  • 155
Paal Pedersen
  • 1,070
  • 1
  • 10
  • 13

4 Answers4

68

@deceze answer is probably the best you can do in Python 3.6. But in Python 3.7, you could directly use asyncio.run in the following way:

newfeature = asyncio.run(main(urls))

It will properly create, handle, and close an event_loop.

Neuron
  • 5,141
  • 5
  • 38
  • 59
Francis Colas
  • 3,459
  • 2
  • 26
  • 31
  • 6
    But what if the code is already running in an `asyncio.run` call? If you use `asyncio.run` inside a function that is invoked with `asyncio.run` then you get `RuntimeError: asyncio.run() cannot be called from a running event loop` – birgersp Oct 31 '21 at 12:11
  • 4
    @birgersp If you are already inside an event_loop you can simply call it via `result = await main(urls)`. – mgutsche Nov 08 '21 at 15:57
  • 9
    @mgutsche That doesn't solve the problem in the original question, you cannot use await unless the function is declared "async def" which may not be possible to do, e.g. for magic methods or interfaces that can't be changed. – Hannes Landeholm Oct 20 '22 at 10:22
  • 1
    Either `asyncio.get_event_loop` or `asyncio.get_running_loop` can be used to check for (and retrieve) any running loop; if there is one, the coroutine can be scheduled by calling `asyncio.ensure_future`. – Berislav Lopac Mar 02 '23 at 14:38
33

You would use an event loop to execute the asynchronous function to completion:

newfeature = asyncio.get_event_loop().run_until_complete(main(urls_and_coords))

(This technique is already used inside main. And I'm not sure why, since main is async you could/should use await fetch_all(...) there.)

deceze
  • 510,633
  • 85
  • 743
  • 889
  • 3
    But then I probably need to rewrite main, since it already has an event_loop? – Paal Pedersen Aug 09 '18 at 08:38
  • 1
    Interesting point, I'm not actually sure off the top of my head whether that would cause any issues. But as I wrote, it makes little sense to use `run_until_complete` inside an `async` function to begin with, you should simply `await` it. – deceze Aug 09 '18 at 08:41
  • 1
    It works for me. Note that in the case you have not event_loop and so you have the error : "RuntimeError: There is no current event loop in thread 'Thread-n'", you can add `asyncio.set_event_loop(asyncio.new_event_loop())` in your function to set an event loop. – 1ronmat Apr 17 '20 at 09:17
17

There are also some libraries that exist to handle this and always do the right thing. One example is asgiref.sync described here which has methods async_to_sync and sync_to_async for performing these conversions:

from asgiref.sync import async_to_sync

@async_to_sync
async def print_data():
    print(await get_data())

print_data()  # Can be called synchronously

More info from the docs for asgiref.sync:

AsyncToSync lets a synchronous subthread stop and wait while the async function is called on the main thread's event loop, and then control is returned to the thread when the async function is finished.

SyncToAsync lets async code call a synchronous function, which is run in a threadpool and control returned to the async coroutine when the synchronous function completes.

There are also other similar projects like koil

Matthew D. Scholefield
  • 2,977
  • 3
  • 31
  • 42
  • 1
    How come this answer has so little upvotes? Thanks, it's great for writing packages with `aiohttp` and for beginners taking their first steps with asynchronous features in Python. – Soren V. Raben Mar 30 '22 at 11:19
  • 8
    ...because adding fragile third-party dependencies to do something Python's standard library already trivially does out-of-the-box is a bad idea – *always*. Just directly call `asyncio.run()` or `asyncio.get_event_loop().run_until_complete()`. In either case, it's a trivial one-liner. The `asyncio` module exists for a reason. `` – Cecil Curry Jul 06 '22 at 04:40
  • 2
    This not a simple thing that is handled by the standard library in any way I see described here. As commented around, functions can be nested in other threads, other runloops, and calling hierarchies of both sync and async code, and there may or may not be a runloop already for the current thread. A function that handles and documents all this well is needed. – fuzzyTew Mar 26 '23 at 11:52
2

I was able to get this working in pure python 3.10 using the built-in asyncio.run_coroutine_threadsafe.

This is new to me, so there are probably some caveats, e.g. since the async method is not actually awaited, the process could (will) exit before the callback completes (unless you do something to ensure it doesn't).

For a reference on where this might occur in the wild, see the bleak BLE library class BleakClient callback method disconnected_callback. Then, in the callback try to emit using the async version of socket.io client, AsyncClient.

Concise problem/solution:

import asyncio
from typing import Callable

Callback = Callable[[int], None]


class SomeSystem:
    """Some library you don't control that is mostly async, but provides a callback that
    is _not_ async."""

    def __init__(self, callback: Callback):
        self._callback = callback

    async def do_something(self):
        """do some work and then call the non-async callback"""
        await asyncio.sleep(1.0)
        self._callback(1)
        await asyncio.sleep(1.0)
        self._callback(2)


async def some_async_method(value: int):
    """some long-running operation normally called by async code"""
    await asyncio.sleep(0.1)
    print(f"long-running: {value}")


async def main():
    """main is async and started as normal with asyncio.run"""
    print("BEGIN main")

    loop = asyncio.get_running_loop()

    def cb(value: int) -> None:
        """This method _cannot_ be async, due to the underlying implementation of SomeSystem."""
        # some_async_method(value)  # RuntimeWarning: coroutine 'some_async_method' was never awaited
        asyncio.run_coroutine_threadsafe(some_async_method(value), loop)  # okay

    system = SomeSystem(cb)
    await system.do_something()

    # maybe ensure the last call to async method is awaited? Without this call, the final callback
    # won't be handled, since it's never being awaited. If anyone knows how to properly wait
    # for this, let me know in the comments!
    await asyncio.sleep(1.0)

    print("END main")


if __name__ == "__main__":
    asyncio.run(main())

Output

BEGIN main
long-running: 1
long-running: 2
END main
cod3monk3y
  • 9,508
  • 6
  • 39
  • 54