26

In threading, we have something called "Thread Context", in which we can save some data (state) for accessing in a special thread. In asyncio, I need to save some state in current execution path, so that all consequent coroutines can access it. What is the solution? Note: I know each coroutine function is instantiated for an execution path in asyncio, but for some reason I can not save the state in function properties. (Although this method os not very good anyway)

dano
  • 91,354
  • 19
  • 222
  • 219
Kamyar
  • 2,494
  • 2
  • 22
  • 33
  • [This python-ideas thread](https://mail.python.org/pipermail/python-ideas/2015-April/033151.html) and [this asyncio bug report](https://github.com/python/asyncio/issues/165) seem relevant. – dano Jun 03 '15 at 15:41
  • `curio` seems to have it https://github.com/dabeaz/curio/pull/85 – Dima Tisnek Feb 02 '17 at 11:33

3 Answers3

30

As of Python 3.7 you can make use of contextvars.ContextVar.

In the example below I declared request_id and set the value in some_outer_coroutine, then accessed it in some_inner_coroutine.

import asyncio
import contextvars

# declare context var
request_id = contextvars.ContextVar('Id of request.')


async def some_inner_coroutine():
    # get value
    print('Processed inner coroutine of request: {}'.format(request_id.get()))


async def some_outer_coroutine(req_id):
    # set value
    request_id.set(req_id)

    await some_inner_coroutine()

    # get value
    print('Processed outer coroutine of request: {}'.format(request_id.get()))


async def main():
    tasks = []
    for req_id in range(1, 5):
        tasks.append(asyncio.create_task(some_outer_coroutine(req_id)))

    await asyncio.gather(*tasks)


if __name__ == '__main__':
    asyncio.run(main())

Output:

Processed inner coroutine of request: 1
Processed outer coroutine of request: 1
Processed inner coroutine of request: 2
Processed outer coroutine of request: 2
Processed inner coroutine of request: 3
Processed outer coroutine of request: 3
Processed inner coroutine of request: 4
Processed outer coroutine of request: 4
1

There's also https://github.com/azazel75/metapensiero.asyncio.tasklocal, but you must be aware that tasks are often created internally by libraries and also by asyncio using ensure_future(a_coroutine) and there's no actual way to track these new tasks and initialize their locals (maybe with those of the task that they are created from). (an "hack" whould be setting a loop.set_task_factory() function with something that does the job, hoping that all code uses loop.create_task() to create the tasks, which is not always true...)

Another issue is that if some of your code is executed inside a Future callback Task.current_task() function which is used by both the libraries to select the right copy of locals to serve will always return None...

0

I personally found the contextvars API too low level. Google has developed some small wrapper around it in https://github.com/google/etils to have a better API:

tl;dr; annotate dataclass fields with edc.ContextVar[T] to make the field be context-dependent. It support all dataclasses.field features (like default_factory so each threads/asyncio task have it's own version:

from etils import edc

@edc.dataclass
@dataclasses.dataclass
class Context:
  thread_id: edc.ContextVar[int] = dataclasses.field(default_factory=threading.get_native_id)

  # Local stack: each thread will use its own instance of the stack
  stack: edc.ContextVar[list[str]] = dataclasses.field(default_factory=list)

# Global context object
context = Context(thread_id=0)

Example of usage:

def worker():
  # Inside each thread, the worker use its own context
  assert context.thread_id != 0
  context.stack.append(1)
  time.sleep(1)
  assert len(context.stack) == 1  # Other workers do not modify the local stack

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
  for _ in range(10):
    executor.submit(worker)

This works with both asyncio and threads.

See documentation

Conchylicultor
  • 4,631
  • 2
  • 37
  • 40