1

I have two threads (main thread and some background thread), and both have their own asyncio event loop.

Now consider I'm in the background thread and I want to execute something (func_for_main_thread) in the main thread. Doing that async, would be this:

main_thread_loop.call_soon_threadsafe(func_for_main_thread)

However, how can I do that synced/blocking, i.e. wait until func_for_main_thread executed?

Related is this question, which asks the same question for Qt, and describes the same functionality of Apple GCD, which is basically:

dispatch_async(dispatch_get_main_queue(), ^{ /* do sth */ });

vs:

dispatch_sync(dispatch_get_main_queue(), ^{ /* do sth */ });
Albert
  • 65,406
  • 61
  • 242
  • 386

1 Answers1

0

If I understood what you want correctly, nothing stops you from passing Future to main thread to set it done once func_for_main_thread done. In background thread you can await for this future.

In other words:

import asyncio
from functools import partial


async def called_threadsafe(loop, func):
    current_loop = asyncio.get_event_loop()
    fut = asyncio.Future()

    def call_and_set():
        try:
            res = func()
        except Exception as exc:
            f = partial(fut.set_exception, exc)
            current_loop.call_soon_threadsafe(f)
        else:
            f = partial(fut.set_result, res)
            current_loop.call_soon_threadsafe(f)
    loop.call_soon_threadsafe(call_and_set)  # submit to execute in other thread

    return await fut  # in current thread await other thread executed func and set future

Full code that demonstrates how it'll work:

import asyncio
from functools import partial
import threading
import time


async def called_threadsafe(loop, func):
    current_loop = asyncio.get_event_loop()
    fut = asyncio.Future()

    def call_and_set():
        try:
            res = func()
        except Exception as exc:
            f = partial(fut.set_exception, exc)
            current_loop.call_soon_threadsafe(f)
        else:
            f = partial(fut.set_result, res)
            current_loop.call_soon_threadsafe(f)
    loop.call_soon_threadsafe(call_and_set)

    return await fut


# helpers:
_l = threading.Lock()

def info(*args):
    with _l:
        print(*args, threading.get_ident(), flush=True)


def start_bg_loop():
    bg_loop = asyncio.new_event_loop()

    def startup():
        asyncio.set_event_loop(bg_loop)
        bg_loop.run_forever()

    t = threading.Thread(target=startup)
    t.daemon = True
    t.start()

    return bg_loop


# main part:
def func_for_main_thread():
    info('executed in fg thread')
    time.sleep(0.05)
    return 'got result in bg thread'


async def bg_main(fg_loop):
    info('bg_main started')
    await asyncio.sleep(0.1)

    res = await called_threadsafe(fg_loop, func_for_main_thread)
    info(res)

    info('bg_main finished')


async def fg_main(bg_loop):
    info('fg_main started')
    await asyncio.sleep(1)
    info('fg_main finished')


fg_loop = asyncio.get_event_loop()
bg_loop = start_bg_loop()

asyncio.run_coroutine_threadsafe(bg_main(fg_loop), bg_loop)
fg_loop.run_until_complete(fg_main(bg_loop))

Output:

fg_main started 2252
bg_main started 5568
executed in fg thread 2252
got result in bg thread 5568
bg_main finished 5568
fg_main finished 2252
Mikhail Gerasimov
  • 36,989
  • 16
  • 116
  • 159