1

I have an asyncio event loop which runs coroutines from various parts of my system and I would like to run some checks each time a new coroutine is added for execution, in the same way you would use middleware functions with HTTP handlers.

Is there a way to achieve this functionality?

  • I also checked some `metaclasses` tricks and added another solution, it is probably more suitable for you since you can wrap many function at once. – Artiom Kozyrev Oct 09 '22 at 17:21

1 Answers1

1

I do not know any "out of the box" way to do it, on the other hand you can implement your own decorators which do the stuff.

import asyncio
from functools import wraps
from time import monotonic, sleep


def some_wrapper(f):
    @wraps(f)
    async def inner(*args, **kwargs):
        start = monotonic()
        if asyncio.iscoroutinefunction(f):
            res = await f(*args, **kwargs)
        else:
            res = f(*args, **kwargs)
        duration = monotonic() - start
        return res, duration
    return inner


@some_wrapper
async def plus_async(x, y):
    await asyncio.sleep(1.0)
    return x + y


@some_wrapper
def minus_sync(x, y):
    sleep(1)  # attention ! it blocks the whole thread !
    return x - y


async def async_main():
    print(*await plus_async(5, 5))
    print(await asyncio.gather(plus_async(4, 4), plus_async(2, 2)))
    print(await asyncio.create_task(plus_async(1, 1)))
    print(await minus_sync(9, 9))


if __name__ == '__main__':
    asyncio.run(async_main())

You can also decorate many functions at once, the code was inspired by the answer, it uses metaclass concept:

import asyncio
from functools import wraps
from time import monotonic, sleep


class DecorateAsyncio(type):

    def __new__(cls, name, bases, local):
        for attr in local:
            value = local[attr]
            if callable(value):
                local[attr] = DecorateAsyncio._some_wrapper(value)

        return type.__new__(cls, name, bases, local)

    @staticmethod
    def _some_wrapper(f):
        @wraps(f)
        # we need self here to exclude errors related to number of arguments
        # since we gonna decorate instance level methods
        async def inner(self, *args, **kwargs):
            start = monotonic()
            if asyncio.iscoroutinefunction(f):
                res = await f(*args, **kwargs)
            else:
                res = f(*args, **kwargs)
            duration = monotonic() - start
            return res, duration

        return inner


class SomeClass(metaclass=DecorateAsyncio):
    async def plus_async(x, y):
        await asyncio.sleep(1.0)
        return x + y

    def minus_sync(x, y):
        sleep(1)  # attention ! it blocks the whole thread !
        return x - y


async def async_main():
    s = SomeClass()
    print(await s.plus_async(5, 5))
    print(await asyncio.gather(s.plus_async(4, 4), s.plus_async(2, 2)))
    print(await asyncio.create_task(s.plus_async(1, 1)))
    print(await s.minus_sync(9, 9))


if __name__ == '__main__':
    asyncio.run(async_main())

Artiom Kozyrev
  • 3,526
  • 2
  • 13
  • 31