2

I am trying to understand the python asyncio's call_soon_threadsafe API, but failed, with below example code, if my simple coroutine want to return something, how should i get the returned value from the caller side?

import time
import asyncio as aio
import uvloop

from threading import Thread

aio.set_event_loop_policy(uvloop.EventLoopPolicy())

async def simple(a, fut:aio.Future):
  await aio.sleep(a)
  return fut.set_result(a)

def delegator(loop):
  aio.set_event_loop(loop)
  loop.run_forever()

loop_exec = aio.new_event_loop()

t = Thread(target=delegator, args=(loop_exec,))
t.start()


if __name__ == '__main__':
  start_time = time.time()

  fut = loop_exec.create_future() # tried to get back returned value by future
  handle = loop_exec.call_soon_threadsafe(aio.ensure_future, simple(3, fut))
  res = aio.wait_for(fut, 10)

  print('Time consumed: {}s'.format(time.time() - start_time))
  print('>>>>>>>>>>', res)

# Output
Time consumed: 3.2901763916015625e-05s
>>>>>>>>>> <generator object wait_for at 0x110bb9b48>

As u can see i was trying to get back the returned value by passing in a future to that coroutine which run in a different thread, but still don't know how to get it properly.

Basically two questions:

  1. With above example code how can i get back the returned value from the caller side?
  2. What's the actual use case for this call_soon_threadsafe, just feel run_coroutine_threadsafe is more convenient to use and it is able to cover almost all the cases i can imagine in this kind of different thread coroutines interaction.
lnshi
  • 2,310
  • 3
  • 19
  • 42
  • To answer question 1, you'd have to use a [concurrent.futures.Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) instead of `loop_exec.create_future` and replace `aio.wait_for(fut, 10)` with `fut.result()`. This is basically what `run_couroutine_threadsafe` does. – Vincent Aug 01 '19 at 19:15

1 Answers1

6

With above example code how can i get back the returned value from the caller side?

Since the event loop runs outside the main thread, you need to use a thread-aware synchronization device. For example:

async def simple(a, event):
    await asyncio.sleep(a)
    event.simple_result = a
    event.set()

done = threading.Event()
loop_exec.call_soon_threadsafe(aio.ensure_future, simple(3, done))
done.wait(10)
res = done.simple_result

Or, you can synchronize using a concurrent.futures.Future, which is like a one-shot event with an object payload. (Note that you can't use an asyncio future, because it's not thread-safe.)

async def simple(a, fut):
    await asyncio.sleep(a)
    fut.set_result(a)

done = concurrent.futures.Future()
loop_exec.call_soon_threadsafe(aio.ensure_future, simple(3, done))
res = done.result(10)

Though creating a "naked" concurrent.futures.Future like in the above example works, it's discouraged by the docs. However, as Vincent pointed out in the comments, this is what run_coroutine_threadsafe does for you:

async def simple(a):
    await asyncio.sleep(a)
    return a

fut = asyncio.run_coroutine_threadsafe(simple(3))
res = fut.result(10)

What's the actual use case for this call_soon_threadsafe

The simplest answer is that call_soon_threadsafe is a lower-level API which you use when you just want to tell the event loop to do or start doing something. call_soon_threadsafe is the building block used to implement functions like run_coroutine_threadsafe, but also many others. As to why you'd want to use that plumbing function yourself...

Sometimes you want to execute an ordinary function, and not a coroutine. Sometimes your function is fire-and-forget and you don't care about its return value. (Or maybe the function will ultimately notify you of its completion through some side channel.) In those cases call_soon_threadsafe is the correct tool for the job, because it's more lightweight, as it doesn't attempt to create an additional concurrent.futures.Future and attach it to the executed code. Examples:

  • loop.call_soon_threadsafe(loop.stop) to tell the event loop to stop running
  • loop.call_soon_threadsafe(queue.put_nowait, some_item) to add something to an unbounded asyncio queue
  • loop.call_soon_threadsafe(asyncio.create_task, coroutinefn()) to submit a coroutine to the event loop without waiting for it to finish
  • loop.call_soon_threadsafe(some_future.set_result, value) to set the result of an asyncio future from a different thread
  • the low-level code in this answer
user4815162342
  • 141,790
  • 18
  • 296
  • 355
  • 1
    Thank u very much for thise awesome detailed explanation. – lnshi Aug 02 '19 at 01:31
  • The `concurrent.futures.Future` documentation [says](https://docs.python.org/3.10/library/concurrent.futures.html#concurrent.futures.Future) "`Future` instances are created by `Executor.submit()` and should not be created directly except for testing." – Maxpm Jul 11 '23 at 18:27
  • 1
    @Maxpm Thanks, I've now updated the answer to softly warn against actually doing that. I would have removed the `concurrent.futures.Future` example altogether, but it's still a nice way of seguing into `run_coroutine_threadsafe`. – user4815162342 Jul 11 '23 at 18:39