I guess the essence of the problem is this: use select
on 2 channels: a channel with capacity=1 and a channel with (possibly) infinite capacity.
Implement select with asio asynchronous operation
Write an asio asynchronous operation to wait for multiple (two) things.
(asio asynchronous operation template: c++ - How to wait for a function to return with Boost:::Asio? - Stack Overflow).
state protected by a mutex:
- a
std::optional<read_result>
- a
std::vector<functor>
- a
bool
(whether there is an on-going async_read)
- a
std::optional<completion handler>
your async_wait_for_2_things
:
- Get the completion handler (a callable, can resume your coroutine) from the completion token (
yield[ec]
);
- Lock the mutex (use guard);
- if there is a pending functor from another_thread, take it out, post the completion handler;
- else if there is a pending read_result, take it out, post the completion handler;
- else if there is a an on-going async_read (the bool is true), store the completion handler (if there is already a completion handler stored, throw "can not happen");
- else (no pending functor, no pending read_result, async_read has not been started), store the completion handler (if there is already a completion handler stored, throw "can not happen"), set the bool to true (if the bool is already true, throw "can not happen"), call async_read;
- Unlock the mutex;
async_read's callback:
- Lock the mutex (use guard);
- set the bool to false (if the bool is already false, throw "can not happen");
- if there is a completion handler, take it out, post it;
- else, store read_result (if there is already a read_result stored, throw "can not happen");
- Unlock the mutex;
another_thread's code for posting functor:
- Lock the mutex (use guard);
- if there is a completion handler, take it out, post it;
- else, store functor;
- Unlock the mutex;
Implement select using asynchronous event
- async_read(use callback overload)'s lambda completion handler: stores result, notifies asynchronous_event;
- another_thread: stores functor, notifies asynchronous_event;
- do_session: asynchronously waits on asynchronous_event, loads result or functor;
- asynchronous_event's data is in a
std::pair<std::optional<read_result>, std::vector<functor>>
protected by a mutex;
Implement asynchronous event using a timer: c++ - Why does Boost.Asio not support an event-based interface? - Stack Overflow.
This is not applicable because "asynchronous event" is-not-a "asynchronous condition variable", it can not:
- release a mutex and block in asynchronous wait atomically
(a possible sequence: do_session release mutex, then functor is posted, then event is notified (cancel_one
), then do_session wait on event (timer_.async_wait(yield[ec]);
) and blocks forever)
Implement select using asynchronous latch
- async_read(use callback overload)'s lambda handler: ①stores result and resets asynchronous_latch_producer, ②notifies asynchronous_latch_consumer, waits on asynchronous_latch_producer(, ⑥wake up);
- another_thread: ①stores functor and resets asynchronous_latch_producer, ②notifies asynchronous_latch_consumer, waits on asynchronous_latch_producer(, ⑥wake up);
- do_session: waits on asynchronous_latch_consumer(, ③wake up), ④loads result or functor and resets asynchronous_latch_consumer, ⑤notifies asynchronous_latch_producer;
- asynchronous_latch_consumer and asynchronous_latch_producer's data is in a
std::pair<std::optional<read_result>, std::vector<functor>>
;
Implement asynchronous latch using a timer: c++ - Cancelling boost asio deadline timer safely - Stack Overflow. Modify that asynchronous event implementation to get asynchronous latch: in constructor and reset
, .expires_at(Timer::clock_type::time_point::max())
; in notify_all_one_shot
, .expires_at(Timer::clock_type::time_point::min())
.
This is not applicable because one of the producer might block forever.