15

We have a codebase in python which uses asyncio, and co-routines (async methods and awaits), what I'd like to do is to call one of these method from a C++ class which has been pulled into python (using pybind11)

Let's say there is this code:

class Foo:
  async def bar(a, b, c):
    # some stuff
    return c * a

Assuming that the code is being invoked from python and there is an io loop handling this, at some point, the code drops into C++ land where this bar method needs to be invoked - how does one await the result of this in C++?

Nim
  • 33,299
  • 2
  • 62
  • 101
  • After re-reading your comments on the deleted answer, I am curious what the your call site (the place you want to put `await` in actually looks like). Is it an `async def` which you want to implement in C++? – user4815162342 Feb 09 '19 at 14:55
  • @user4815162342 - that is correct, in python land - there are `async def` methods which at points have `await ..` for other async ops. So now - instead of there being an `async` python method, I have a C++ function and in there want to achieve the same effect (well, something similar) – Nim Feb 13 '19 at 09:30

4 Answers4

10

It is possible to implement a Python coroutine in C++, but takes some work. You need to do what the interpreter (in static languages the compiler) normally does for you and transform your async function into a state machine. Consider a very simple coroutine:

async def coro():
    x = foo()
    y = await bar()
    baz(x, y)
    return 42

Invoking coro() doesn't run any of its code, but it produces an awaitable object which can be started and then resumed multiple times. (But you don't normally see these operations because they are transparently performed by the event loop.) The awaitable can respond in two different ways: by 1) suspending, or by 2) indicating that it is done.

Inside a coroutine await implements suspension. If a coroutine were implemented with a generator, y = await bar() would desugar to:

# pseudo-code for y = await bar()

_bar_iter = bar().__await__()
while True:
    try:
        _suspend_val = next(_bar_iter)
    except StopIteration as _stop:
        y = _stop.value
        break
    yield _suspend_val

In other words, await suspends (yields) as long as the awaited object does. The awaited object signals that it's done by raising StopIteration, and by smuggling the return value inside its value attribute. If yield-in-a-loop sounds like yield from, you're exactly right, and that is why await is often described in terms of yield from. However, in C++ we don't have yield (yet), so we have to integrate the above into the state machine.

To implement async def from scratch, we need to have a type that satisfies the following constraints:

  • doesn't do much when constructed - typically it will just store the arguments it received
  • has an __await__ method that returns an iterable, which can just be self;
  • has an __iter__ which returns an iterator, which can again be self;
  • has a __next__ method whose invocation implements one step of the state machine, with return meaning suspension and raising StopIteration meaning finishing.

The above coroutine's state machine in __next__ will consist of three states:

  1. the initial one, when it invokes the foo() sync function
  2. the next state when it keeps awaiting the bar() coroutine for as long as it suspends (propagating the suspends) to the caller. Once bar() returns a value, we can immediately proceed to calling baz() and returning the value via the StopIteration exception.
  3. the final state which simply raises an exception informing the caller that the coroutine is spent.

So the async def coro() definition shown above can be thought of as syntactic sugar for the following:

class coro:
    def __init__(self):
        self._state = 0

    def __iter__(self):
        return self

    def __await__(self):
        return self

    def __next__(self):
        if self._state == 0:
            self._x = foo()
            self._bar_iter = bar().__await__()
            self._state = 1

        if self._state == 1:
            try:
                suspend_val = next(self._bar_iter)
                # propagate the suspended value to the caller
                # don't change _state, we will return here for
                # as long as bar() keeps suspending
                return suspend_val
            except StopIteration as stop:
                # we got our value
                y = stop.value
            # since we got the value, immediately proceed to
            # invoking `baz`
            baz(self._x, y)
            self._state = 2
            # tell the caller that we're done and inform
            # it of the return value
            raise StopIteration(42)

        # the final state only serves to disable accidental
        # resumption of a finished coroutine
        raise RuntimeError("cannot reuse already awaited coroutine")

We can test that our "coroutine" works using real asyncio:

>>> class coro:
... (definition from above)
...
>>> def foo():
...     print('foo')
...     return 20
... 
>>> async def bar():
...     print('bar')
...     return 10
... 
>>> def baz(x, y):
...     print(x, y)
... 
>>> asyncio.run(coro())
foo
bar
20 10
42

The remaining part is to write the coro class in Python/C or in pybind11.

user4815162342
  • 141,790
  • 18
  • 296
  • 355
  • This is a great treatment of the problem, I will try this out and revert! Thank you. – Nim Feb 14 '19 at 11:24
  • @Nim Thank you. Some additional details are also present in this [older answer](https://stackoverflow.com/a/51115745/1600898), although the code there depends too much on asyncio, which for your use case should not be necessary (though it can still be done if needed). I think this answer captures the core idea better. – user4815162342 Feb 14 '19 at 12:17
5

This isn't pybind11, but you can call an async function directly from C. You simply add a callback to the future using add_done_callback. I assume pybind11 allows you to call python functions so the steps would be the same:

https://github.com/MarkReedZ/mrhttp/blob/master/src/mrhttp/internals/protocol.c

result = protocol_callPageHandler(self, r->func, request))

Now the result of an async function is a future. Just like in python you need to call create_task using the resulting future:

PyObject *task;
if(!(task = PyObject_CallFunctionObjArgs(self->create_task, result, NULL))) return NULL;

And then you need to add a callback using add_done_callback:

add_done_callback = PyObject_GetAttrString(task, "add_done_callback")
PyObject_CallFunctionObjArgs(add_done_callback, self->task_done, NULL)

self->task_done is a C function registered in python which will be called when the task is done.

MarkReedZ
  • 1,421
  • 4
  • 10
0

Single shot callbacks

First, get a basic callback working from pybind11. Here are some references:

Next get an asyncio Future POC / example working. See:

Now that you have these two core mechanisms figured out, combine them!

The key "trick" is to call a "setCallback" pybind function from Python and pass your future object's set_result function to it. You now will then have a callback from C++ feeding into an asyncio Future!

If you use multi-threading in your C++, however, you might run into an issue with this... The future value could be set, but the asyncio event loop may not respond. To resolve that, I recommend adding another callback to "wake" the loop. See: https://stackoverflow.com/a/76724344/3220983

UNTESTED EXAMPLE SNIPPETS

C++

#include <string>
#include <thread>

#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <pybind11/functional.h>
namespace py = pybind11;

typedef std::function<void(const std::string &)> MessageHandler;
typedef std::function<void()> WakeHandler;

class MyHelper
{
public:
    MyHelper(){}
    ~MyHelper(){}

    void setMessageHandler( const MessageHandler handler )
    { messageHandler_ = handler; }

    void setWakeHandler( const WakeHandler handler )
    { wakeHandler_ = handler; }

    void test()
    {   
        std::thread t([this](){
            std::this_thread::sleep_for( std::chrono::seconds( 3 ) );
            if( messageHandler_ ) messageHandler_( "Hello from C++!" );
            if( wakeHandler_ ) wakeHandler_();
        });
        t.detach();    
    }

    static void pybindInit( py::module &m )
    {
        py::class_<MyHelper>( m, "MyHelper" )
            .def( py::init<>() )
            .def( "setMessageHandler", &MyClass::setMessageHandler,
                  py::call_guard<py::gil_scoped_release>() )
            .def( "setWakeHandler"   , &MyClass::setWakeHandler,
                  py::call_guard<py::gil_scoped_release>() )
            .def( "test"             , &MyClass::test,
                  py::call_guard<py::gil_scoped_release>() )
        ;
    }

private:
    MessageHandler messageHandler_;
    WakeHandler wakeHandler_;
};

Python:

import MyHelper

async def main( helper ):
    global event_loop
    event_loop = asyncio.get_running_loop()
    helper.test()
    await asyncio.gather( get_help( helper ), other_coro() )
    print( "Success!" )

def wake():
    asyncio.run_coroutine_threadsafe( asyncio.sleep( 0 ), event_loop )

async def get_help( helper ):        
    future = event_loop.create_future()
    helper.setMessageHandler( future.set_result )
    helper.setWakeHandler( wake )
    print( await future )

async def other_coro():
    for i in range(5):
        await asyncio.sleep( 1 )
        print( "some other python work...." )

if __name__ == "__main__": asyncio.run( main( MyHelper() ) )

Repetitive callbacks

Ok... so that may not "be it", depending on what you need exactly.... A Future's set_result can only be fired once. :( Also, you may want to "cancel" it from C++ or return an exception...

If your goal is to sometimes call into C++ "for help" and get a result back asynchronously, what I described / demoed ought to work. But if you want C++ to repeatedly send "events" or async "messages" into your Python script whenever it wishes, you'll need to do a bit more work on both sides to expand upon this this simple design. In a nutshell, from the Python side you'll want to keep creating new future objects and passing them to C++ for each callback. You'll also want to be sure on the C++ side to only use each of those callbacks once, and optionally to block from that side until a new one is assigned and therefore "ready" on the Python end.

BuvinJ
  • 10,221
  • 5
  • 83
  • 96
-2

For things like this, if I don't want to delve too deep into CPython API, I just write my stuff in Python, and call that using pybinds Python interface.

An example: https://github.com/RobotLocomotion/drake/blob/a7700d3/bindings/pydrake/init.py#L44 https://github.com/RobotLocomotion/drake/blob/a7700d3/bindings/pydrake/pydrake_pybind.h#L359

Rendering onto this use case, perhaps you can do:

# cpp_helpers.py
def await_(obj):
    return await obj
py::object py_await = py::module::import("cpp_helpers").attr("await_");
auto result = py::cast<MyResult>(py_await(py_obj));

However, this will very likely be less performant than the above solutions.

eacousineau
  • 3,457
  • 3
  • 34
  • 37