2

As for now, I've found a lot of examples on how contextvars module behaves with asyncio, but none on how one behaves with threads (asyncio.get_event_loop().run_in_executor, threading.Thread, and so on).

My question is, how can I pass context to a separate thread? Below you can see a code snippet that does not work (python 3.9.8).

import typing
import asyncio
import contextvars
import concurrent.futures


class CustomThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
    def submit(
        self,
        function: typing.Callable,
        *args,
        **kwargs
    ) -> concurrent.futures.Future:
        context = contextvars.copy_context()
        return super().submit(
            context.run,
            functools.partial(function, *args, **kwargs)
        )


def function():
    print(var.get())


async def main():
    await asyncio.get_event_loop().run_in_executor(None, function)


if __name__ == '__main__':
    var = contextvars.ContextVar('variable')
    var.set('Message.')

    asyncio.get_event_loop().set_default_executor(CustomThreadPoolExecutor)
    asyncio.run(main())
Vlad Vladovich
  • 113
  • 1
  • 6

1 Answers1

4

You can use wrapper function that takes copy_context.items(), set them and call your function. functools.partial will help you to create wrapped function for passing to run_in_executor. This is working test for my decorators:

def test_run_in_thread_pool_executor():
    def init(func, ctx_vars, *args, **kwargs):
        for var, value in ctx_vars:
            var.set(value)
        return func(*args, **kwargs)

    @async_add_headers('streaming')
    async def wrapper(f):
        loop = asyncio.get_event_loop()
        ctx = contextvars.copy_context()
        executor = futures.ThreadPoolExecutor(max_workers=5)
        return await loop.run_in_executor(executor, functools.partial(init, f, ctx.items()))

    @add_headers('client')
    def foo():
        assert caller_context_var.get() == 'streaming'

    async def main_test():
        await wrapper(foo)

    asyncio.run(main_test())

Here add_headers and async_add_headers change some contextvars in order of calling functions. caller_context_var.get() would be equal to 'client' without init function.

Unfortunately it works only for ThreadPoolExecutor and doesn't for ProcessPoolExecutor because Context objects are not picklable. Check relative PEP 567 section. There are also example with executor:

executor = ThreadPoolExecutor()
current_context = contextvars.copy_context()

executor.submit(current_context.run, some_function)