0

TL;DR: Q: How to lock protect a shared resource from a sync function in py39?

Sorry if this was already answered - I just couldn't find it easily. I'm struggling how to correctly await a lock when accessing a shared resourced. My API is accessing a backend API which spends forever logging in, so I'm caching the auth token, and renewing whenever it expires:

async def _login():
    global connection_lock
    while True:
        async with connection_lock:
            # return a new connection or the one which has been cached in the meantime
            # or raise whichever error received while attempting to login


class BackendSystem:
    @property
    def client(self):
        if self._client is None or cache().get('headers') is None:
            self._client = loop.run_until_complete(_login())
        return self._client

Now the issue is that in certain cases several requests (likely in parallel) to flask will be issued to flask in parallel, causing this error:

  File "/usr/lib/python3.6/asyncio/base_events.py", line 471, in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.6/asyncio/base_events.py", line 425, in run_forever
    raise RuntimeError('This event loop is already running')

And I did find suggestions to install nest-asyncio however I'm not sure that is truly what I need TBH. If I understand correctly then the loops isn't nested (ie. run from within each other, but I could very well be wrong), but rather I'm trying to use the same loop which is already running - and possibly this part is illegal?

It just strikes me as incredible that it should be this hard to do the very basic lock protection of a shared resource from a function which isn't async, and which I don't have any intent to convert into being async.

1 Answers1

0

I created a Reproducible Example (it helps a lot !) :

import asyncio


connection_lock = asyncio.Lock()
loop = asyncio.get_event_loop()


class Client:
    def __init__(self, username: str):
        self.username = username

    def __str__(self):
        return f"Client({self.username!r})"


async def _login(username) -> Client:
    global connection_lock
    while True:
        print(f"{username} will lock")
        async with connection_lock:
            print(f"{username} got the lock")
            await asyncio.sleep(5)  # sleep a bit
            print(f"{username} has finished")
            return Client(username)


class BackendSystem:
    def __init__(self, username: str):
        self._client = None
        self.username = username

    @property
    def client(self):
        if self._client is None:
            self._client = loop.run_until_complete(_login(self.username))
        return self._client


def main1():
    def do_something(username: str):
        print(BackendSystem(username).client)

    for username in ["Steffen", "Lenormju"]:
        do_something(username)


def main2():
    async def do_something(username: str):
        print(BackendSystem(username).client)

    future = asyncio.gather(
        do_something("Steffen"), do_something("Lenormju")
    )
    results = loop.run_until_complete(future)
    return results


if __name__ == '__main__':
    print("main1 ================")
    main1()
    print("main2 ================")
    main2()

The output is :

main1 ================
Steffen will lock
Steffen got the lock
Steffen has finished
Client('Steffen')
Lenormju will lock
Lenormju got the lock
Lenormju has finished
Client('Lenormju')
main2 ================
Lenormju will lock
Lenormju got the lock
Steffen will lock

Traceback (most recent call last):
  File "C:/PycharmProjects/stack_overflow/68159604.py", line 62, in <module>
    main2()
  File "C:/PycharmProjects/stack_overflow/68159604.py", line 54, in main2
    results = loop.run_until_complete(future)
  File "C:\Program Files\Python3.6.8\lib\asyncio\base_events.py", line 484, in run_until_complete
    return future.result()
  File "C:/PycharmProjects/stack_overflow/68159604.py", line 49, in do_something
    print(BackendSystem(username).client)
  File "C:/PycharmProjects/stack_overflow/68159604.py", line 35, in client
    self._client = loop.run_until_complete(_login(self.username))
  File "C:\Program Files\Python3.6.8\lib\asyncio\base_events.py", line 471, in run_until_complete
    self.run_forever()
  File "C:\Program Files\Python3.6.8\lib\asyncio\base_events.py", line 425, in run_forever
    raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running

The problem occurs when there are concurrent requests, the main1 shows that sequential works fine. It confirms that you encounter it when the code is run by Flask (already in an event loop).

A more minimal example is :

import asyncio


loop = asyncio.get_event_loop()


if __name__ == '__main__':
    async def lock_5_seconds():
        await asyncio.sleep(5)

    def do_synchronously_something_async():
        loop.run_until_complete(lock_5_seconds())

    async def do_something_async():
        do_synchronously_something_async()

    loop.run_until_complete(
        asyncio.gather(
            do_something_async(),
            do_something_async()
        )
    )

Looking at the asyncio.loop.run_until_complete documentation :

Run until the future [...] has completed.

As explained in this answer :

Event loop's method such as run_forever or run_until_complete — are just a ways to start event loop in general.

You must not call them several times if you never stopped it. Flask starts it once for you, you should not start it again yourself.

But I guess (re)starting the event loop was not what you meant.
Your actual problem is that you want to synchronously call an async function. Indeed, async functions get scheduled to run on the event loop. You can't just call them directly.

But this question's answers tell you to just call loop.run_until_complete, which does not work in your case because you already have an event loop running.

Here is a discussion about a similar case on Reddit : Calling async functions from synchronous functions. And one on StackOverflow from someone using FastAPI : Python: Call asynchronous code from synchronous method when there is already an event loop running.

The conclusion is that there is NO way to do exactly what you want.
I think you should change your design : the call to your client property is currently synchronous, although it has to call asynchronous (slowwww) code (the _login function).

You did not say what the rest of your code does, but if it's a wrapper around a remote API I recommend something like that :

async def login():
    ...

class BackendSystem():
    async def ensure_is_logged():
        ...

    async def call_this_api_method():
        await self.ensure_is_logged()
        ...

and embrace asynchronous code.

Or just don't make your login function async. Mixing both is a recipe for headaches.

Lenormju
  • 4,078
  • 2
  • 8
  • 22
  • I can make it work by simply using threading.lock - what’s the downside of that approach? – Steffen Schumacher Jun 28 '21 at 17:41
  • Why are your `_login` function and `lock` async in the first place ? If you have no reason to use async, just don't do it. But a [`threading.Lock`](https://docs.python.org/3/library/threading.html#threading.Lock) may cause deadlocks to your application, depending on whether Flask is configured to run single-threaded or not. Locking is dangerous, async is complicated, I recommend you make sure of your **actual** constraints and choose the trade-off based on that and your skills. – Lenormju Jun 29 '21 at 05:55
  • 1
    The locks are due to parallel API requests, causing race conditions on which thread is renewing the token and manipulating the cache. I've done threading for ~20 years, so I'm comfortable with it and using the with lock syntax its less dangerous - for some reason I just got the impression that the async stuff was going to replace it, but now I realise that the two are not targeting the same problems. – Steffen Schumacher Jun 29 '21 at 10:09