1

Using Boost Asio, how would I implement an "event" class to resume C++20 coroutines?

Kind of like this:

// Oversimplicated, but hopefully good enough as example
struct oneshot_event {
    void raise();
    async::awaitable<void> wait();
};

Where wait() can be co_awaited and the coroutine is resumed as soon as something calls raise().

I know that Boost Asio has (or had? I found it in outdated documentation) a coro class that can yield, but that won't help me here, as there are multiple events that can be awaited. I am also not sure how all this Boost.asio and Boost.coroutine stuff works together

Tuxifan

tuxifan
  • 29
  • 5

2 Answers2

1

Hope this is what you wanted:

#include <boost/asio.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <iostream>
#include <memory>
#include <mutex>
#include <atomic>
#include <condition_variable>

namespace asio = boost::asio;
using asio::awaitable;
using asio::co_spawn;
using asio::detached;
using asio::use_awaitable;

// Struct representing an event that can be raised and waited upon
struct oneshot_event {
    asio::io_context& io_; // Reference to the IO context for async operations
    std::atomic_bool raised_ = false; // Flag indicating whether the event has been raised
    std::vector<std::shared_ptr<asio::steady_timer>> timers_; // Timers associated with the event
    std::mutex mutex_; // Mutex for thread safety
    std::condition_variable cv_; // Condition variable for synchronization
    
    // Constructor that takes an IO context reference
    oneshot_event(asio::io_context& io) : io_(io) {}

    // Raise the event, allowing waiting coroutines to resume
    void raise() {
        std::lock_guard<std::mutex> lock(mutex_);
        if (!raised_) {
            raised_ = true;
            for (auto& timer : timers_)
                timer->cancel();
            cv_.notify_all();
        }
    }
    
    // Wait for the event to be raised
    awaitable<void> wait() {
            std::unique_lock<std::mutex> lock(mutex_);
            if (!raised_) {
                // Create a new shared_ptr to a timer and add it to the vector
                auto timer = std::make_shared<asio::steady_timer>(io_, std::chrono::seconds(0));
                timers_.push_back(timer);
        
                // Suspend the coroutine until the timer expires or is canceled
                co_await timer->async_wait(use_awaitable);
            } else {
                // If the event is already raised, immediately resume the coroutine
                co_return;
            }
        }
};

// Coroutine that waits for an event and resumes when the event is raised
awaitable<void> myCoroutine(oneshot_event& event) {
    std::cout << "Coroutine waiting...\n";
    co_await event.wait();
    std::cout << "Coroutine resumed!\n";
}

int main() {
    asio::io_context io;

    oneshot_event event(io);
    auto coroutine = myCoroutine(event);

    co_spawn(io, std::move(coroutine), detached);

    std::cout << "Sleeping for 3 seconds...\n";
    std::this_thread::sleep_for(std::chrono::seconds(3));

    event.raise();  // Resume the coroutine

    io.run();
    return 0;
}

Output:

Sleeping for 3 seconds...
Coroutine waiting...
Coroutine resumed!
gera verbun
  • 285
  • 3
  • 6
  • A problem with the example is that all the waiting precedes even the initial resume of the coro. In order to make it realistic, you will need to make it thread-safe (or use another async operation cause the event to be raised) – sehe May 10 '23 at 22:11
1

The traditional answer would have employed an awaitable timer (gera posted an answer around that).

However, that can be error prone and clumsy, especially with multithreading (both race conditions (e.g. https://stackoverflow.com/a/22204127/85371 and more specifically https://stackoverflow.com/a/43169596/85371) and data races (e.g. gera's code isn't threadsafe).

A lot more flexible is the experimental channels support: https://www.boost.org/doc/libs/master/doc/html/boost_asio/overview/channels.html

This will allow you to either use multiple channels for multiple events, or support many events over a single channel. Channels exist in thread-safe variant (asio::concurrent_channel) out of the box.

Channel Demo

As per the comment request, here's a sample based on channels:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/experimental/concurrent_channel.hpp>
#include <iostream>
#include <memory>

namespace asio = boost::asio;
using boost::system::error_code;
using Channel = asio::experimental::concurrent_channel<void(error_code, std::size_t)>;
using CoChannel = asio::use_awaitable_t<>::as_default_on_t<Channel>;

asio::awaitable<void> myCoroutine(CoChannel& event) {
    try {
        std::cout << "Coroutine: waiting..." << std::endl;
        while (true) {
            auto evt = co_await event.async_receive();
            std::cout << "Coroutine: resumed! (" << evt << ")" << std::endl;
        }
    } catch (boost::system::system_error const& e) {
        std::cout << "Coroutine: " << e.code().message() << std::endl;
    }
    std::cout << "Coroutine: done" << std::endl;
}

int main() {
    asio::thread_pool io;

    CoChannel event(io);
    co_spawn(io, myCoroutine(event), asio::detached);

    for (auto i : {1, 2, 3}) {
        std::cout << "Main: Sleeping for " << i << " seconds..." << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(i));

        event.try_send(error_code{}, i);
        ; // Resume the coroutine
    }

    event.close();
    io.join();
    std::cout << "Main: done" << std::endl;
}

Testing locally (for visible timing):

enter image description here

sehe
  • 374,641
  • 47
  • 450
  • 633
  • While this link may answer the question, it is better to include the essential parts of the answer here and provide the link for reference. Link-only answers can become invalid if the linked page changes. - [From Review](/review/low-quality-posts/34367658) – Mimouni May 11 '23 at 15:36
  • @Mimouni Hahaha. I referred to the existing answer which ironically severely lacks any explanations. I opted instead to add only explanations. But, sure, this is one in a thousand of my answers without a live demo. Mea culpa – sehe May 11 '23 at 17:58
  • Thanks!!! This should suit my needs. How long to experimental features like this usually take until they are declared stable? – tuxifan May 11 '23 at 21:09
  • That varies. Lately the more modern features (e.g. actually requiring c++11 like `asio::as_tuple`) seem to have been graduated rather quickly, as in 1 or 2 Asio release cycles. This is my subjective experience only. – sehe May 11 '23 at 22:42