11

Is it possible to perform an asynchronous wait (read : non-blocking) on a conditional variable in boost::asio ? if it isn't directly supported any hints on implementing it would be appreciated.

I could implement a timer and fire a wakeup even every few ms, but this is approach is vastly inferior, I find it hard to believe that condition variable synchronization is not implemented / documented.

Hassan Syed
  • 20,075
  • 11
  • 87
  • 171
  • what do you want to do??? - is maybe the last version of [async_read_until](http://www.boost.org/doc/libs/1_47_0/doc/html/boost_asio/reference/async_read_until.html) what you're looking for? non-blocking waiting is normaly a task for [boost thread](http://www.boost.org/doc/libs/1_47_0/doc/html/thread.html) ... boost thread in combination with boost asio should work ... – jenseb Jul 21 '11 at 13:14
  • I have an alternative implementation in mind, I have outlined it in another question here. http://stackoverflow.com/questions/6776779/boost-asio-multi-io-service-rpc-framework-design-rfc This might give you more insight into what I want to achive. – Hassan Syed Jul 21 '11 at 13:31

4 Answers4

7

If I understand the intent correctly, you want to launch an event handler, when some condition variable is signaled, in context of asio thread pool? I think it would be sufficient to wait on the condition variable in the beginning of the handler, and io_service::post() itself back in the pool in the end, something of this sort:

#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
boost::asio::io_service io;
boost::mutex mx;
boost::condition_variable cv;
void handler()
{
    boost::unique_lock<boost::mutex> lk(mx);
         cv.wait(lk);
    std::cout << "handler awakened\n";
    io.post(handler);
}
void buzzer()
{
    for(;;)
    {
        boost::this_thread::sleep(boost::posix_time::seconds(1));
        boost::lock_guard<boost::mutex> lk(mx);
            cv.notify_all();
    }
}
int main()
{
    io.post(handler);
    boost::thread bt(buzzer);
    io.run();
}
Cubbi
  • 46,567
  • 13
  • 103
  • 169
  • 8
    but the thread that waits will get blocked, isn't there a way of not blocking a thread, but to register a completion handler instead ? I am currently consdiering an alternative mechanism here http://stackoverflow.com/questions/6776779/boost-asio-multi-io-service-rpc-framework-design-rfc – Hassan Syed Jul 21 '11 at 13:27
  • @Hassan Syed : a condition variable is a concept that involves a blocked thread. Perhaps you are looking for async signals instead? boost.asio just added support for signal handlers in 1.47.0: http://www.boost.org/doc/libs/1_47_0/doc/html/boost_asio/history.html – Cubbi Jul 21 '11 at 13:35
  • As far as I can tell, these are signals that are emited by the operating system. It is shown that you can register for these signals, but it is the OS that will emit them. – Hassan Syed Jul 21 '11 at 13:49
  • your answer is correct, I was operating under the assumption that `io_service::run()` is a blocking call for the callee, and that asio takes care of synchronization inately somehow. I'm glad this assumption isn't true. – Hassan Syed Jul 22 '11 at 15:49
  • 1
    The `io_service::post` link in the answer is broken. Was `io_service::post` removed? It doesn't appear in the current asio doc's reference section. – Praxeolitic Jan 06 '17 at 04:56
0

I can suggest solution based on boost::asio::deadline_timer which works fine for me. This is kind of async event in boost::asio environment. One very important thing is that the 'handler' must be serialised through the same 'strand_' as 'cancel', because using 'boost::asio::deadline_timer' from multiple threads is not thread safe.

class async_event
{
public:
    async_event(
        boost::asio::io_service& io_service,
        boost::asio::strand<boost::asio::io_context::executor_type>& strand)
            : strand_(strand)
            , deadline_timer_(io_service, boost::posix_time::ptime(boost::posix_time::pos_infin))
    {}

    // 'handler' must be serialised through the same 'strand_' as 'cancel' or 'cancel_one'
    //  because using 'boost::asio::deadline_timer' from multiple threads is not thread safe
    template<class WaitHandler>
    void async_wait(WaitHandler&& handler) {
        deadline_timer_.async_wait(handler);
    }
    void async_notify_one() {
        boost::asio::post(strand_, boost::bind(&async_event::async_notify_one_serialized, this));
    }
    void async_notify_all() {
        boost::asio::post(strand_, boost::bind(&async_event::async_notify_all_serialized, this));
    }
private:
    void async_notify_one_serialized() {
        deadline_timer_.cancel_one();
    }
    void async_notify_all_serialized() {
        deadline_timer_.cancel();
    }
    boost::asio::strand<boost::asio::io_context::executor_type>& strand_;
    boost::asio::deadline_timer deadline_timer_;
};
0

Unfortunately, Boost ASIO doesn't have an async_wait_for_condvar() method.

In most cases, you also won't need it. Programming the ASIO way usually means, that you use strands, not mutexes or condition variables, to protect shared resources. Except for rare cases, which usually focus around correct construction or destruction order at startup and exit, you won't need mutexes or condition variables at all.

When modifying a shared resource, the classic, partially synchronous threaded way is as follows:

  • Lock the mutex protecting the resource
  • Update whatever needs to be updated
  • Signal a condition variable, if further processing by a waiting thread is required
  • Unlock the mutex

The fully asynchronous ASIO way is though:

  • Generate a message, that contains everything, that is needed to update the resource
  • Post a call to an update handler with that message to the resource's strand
  • If further processing is needed, let that update handler create further message(s) and post them to the apropriate resources' strands.
  • If jobs can be executed on fully private data, then post them directly to the io-context instead.

Here is an example of a class some_shared_resource, that receives a string state and triggers some further processing depending on the state received. Please note, that all processing in the private method some_shared_resource::receive_state() is fully thread-safe, as the strand serializes all calls.

Of course, the example is not complete; some_other_resource needs a similiar send_code_red() method as some_shared_ressource::send_state().

#include <boost/asio>
#include <memory>

using asio_context = boost::asio::io_context;
using asio_executor_type = asio_context::executor_type;
using asio_strand = boost::asio::strand<asio_executor_type>;

class some_other_resource;
class some_shared_resource : public std::enable_shared_from_this<some_shared_resource> {
    asio_strand strand;
    std::shared_ptr<some_other_resource> other;
    std::string state;

    void receive_state(std::string&& new_state) {
        std::string oldstate = std::exchange(state, new_state);
        if(state == "red" && oldstate != "red") {
            // state transition to "red":
            other.send_code_red(true);
        } else if(state != "red" && oldstate == "red") {
            // state transition from "red":
            other.send_code_red(false);
        }
    }

public:
    some_shared_resource(asio_context& ctx, const std::shared_ptr<some_other_resource>& other)
      : strand(ctx.get_executor()), other(other) {}

    void send_state(std::string&& new_state) {
        boost::asio::post(strand, [me = weak_from_this(), new_state = std::move(new_state)]() mutable {
            if(auto self = me.lock(); self) {
                self->receive_state(std::move(new_state));
            }
        });
    }
};

As you see, posting always into ASIO's strands can be a bit tedious at first. But you can move most of that "equip a class with a strand" code into a template.

The good thing about message passing: As you are not using mutexes, you cannot deadlock yourself anymore, even in extreme situations. Also, using message passing, it is often easier to create a high level of parallelity than with classical multithreading. On the downside, moving and copying around all these message objects is time consuming, which can slow down your application.

A last note: Using the weak pointer in the message formed by send_state() facilitates the reliable destruction of some_shared_resource objects: Otherwise, if A calls B and B calls C and C calls A (possibly only after a timeout or similiar), using shared pointers instead of weak pointers in the messages would create cyclic references, which then prevents object destruction. If you are sure, that you never will have cycles, and that processing messages from to-be-deleted objects doesn't pose a problem, you can use shared_from_this() instead of weak_from_this(), of course. If you are sure, that objects won't get deleted before ASIO has been stopped (and all working threads been joined back to the main thread), then you can also directly capture the this pointer instead.

Kai Petzke
  • 2,150
  • 21
  • 29
  • strands are a naive mechanism for ensuring that callback handlers can be synchronized on a thread pool, however they don't allow for higher-level synchronization of sequences each of multiple asynchronous operations. – Spongman Dec 20 '22 at 02:38
  • I don't see, how strands do not allow synchronization of sequences of operations. Just let each operation post whatever operation is required next to the appropriate strand. Each strand protects and synchronizes access to one resource. – Kai Petzke Dec 21 '22 at 03:04
  • strands only ensure that one thread is executing code in that strand at any one time. it does not serialize sequences of operations. say you have a sequence of async operations A->B->C. and you start two of them: A1->B1-C1 and A2->B2->C2 on the same strand. the strand will guarantee that the callbacks handlers don't execute at the same time, but it does not ensure that the sequences do not overlap, for that you need some other mechanism like the "asynchronous wait" that OP was asking about. – Spongman Dec 21 '22 at 03:15
  • Do I understand you right, that you want to ensure, that C1 finishes before A2 starts? In that case, these operations are apparently not independent and the design of the application should be reworked to make A, B and C truely independent operations. Strands on A, B and C will then ensure, that A1 and A2 don't run concurrently, and the same for B1/B2 and C1/C2. They will also guarantee, that if A1 was executed before A2, then B1 (which is posted from A1) will run before B2 (which is posted from B2), and the same for C1/C2. – Kai Petzke Dec 21 '22 at 03:57
  • What can (and likely will) happen, is that B1 runs concurrently to A2, and C1 runs in parallel to A2 and/or B2. But in a well designed async app, that's a feature, not a bug. – Kai Petzke Dec 21 '22 at 03:58
  • If you truely need that sequence A→B→C to run at most once, the `boost::async` way to implement that is to have a global queue object `Q` with its own strand, and two operations on it: `Q.push()` adds a task to the queue and starts running it by calling operation A immediately, if the queue was empty before. `Q.push()` is called from whatever used to launch the A operation directly before. `Q.finish_current()` marks the currently running task as finished, and is usually called from C, but can also be called from A or B, if the operation fails halfway through. – Kai Petzke Dec 21 '22 at 04:05
  • The strand on `Q` ensures, that `Q.push()` and `Q.finish_current()` are properly serialized with respect to each other. The queue ensures, that at most one of the operation chains posted to the queue is executing, however many async steps it has. All other operations posted keep waiting. So no condition variables, mutexes or semaphores are needed, just a simple queue and a properly used strand on it. – Kai Petzke Dec 21 '22 at 04:10
  • yeah, i understand how to do it explicitly with queues. however, an async-awaitable mutex would like OP asked for would be analogous to a regular mutex in the same way that a coroutine is analogous to a function. it's an obvious gap.. – Spongman Dec 21 '22 at 05:00
  • declaring something as well-designed or not based on abstract requirements like this is rather high-minded. is it bad design to use a mutex to synchronize two non-atomic operations? possibly, but it's not an unreasonable thing to expect in an API. – Spongman Dec 21 '22 at 05:03
0

FWIW, I implemented an asynchronous mutex using the rather good continuable library:

class async_mutex
{
    cti::continuable<> tail_{cti::make_ready_continuable()};
    std::mutex mutex_;

public:
    async_mutex() = default;
    async_mutex(const async_mutex&) = delete;
    const async_mutex& operator=(const async_mutex&) = delete;

    [[nodiscard]] cti::continuable<std::shared_ptr<int>> lock()
    {
        std::shared_ptr<int> result;
        cti::continuable<> tail = cti::make_continuable<void>(
            [&result](auto&& promise) {
                result = std::shared_ptr<int>((int*)1,
                    [promise = std::move(promise)](auto) mutable {
                        promise.set_value();
                    }
                );
            }
        );

        {
            std::lock_guard _{mutex_};
            std::swap(tail, tail_);
        }
        co_await std::move(tail);
        co_return result;
    }
};

usage eg:

async_mutex mutex;

...

{
    const auto _ = co_await mutex.lock();
    // only one lock per mutex-instance
}

Spongman
  • 9,665
  • 8
  • 39
  • 58