0
int main()
{
    tcp::socket socket(iocp);
    acceptor.async_accept(socket, yield[ec]);
    if (ec)
        fail(ec, "accept");
    else
        boost::asio::spawn(acceptor.get_executor(), std::bind(&do_session, websocket::stream<beast::tcp_stream>(std::move(socket)), std::placeholders::_1));

    ... iocp run
}

void do_session(websocket::stream<beast::tcp_stream>& ws, net::yield_context yield)
{
    while(ws.is_open())
    {
        ws.async_read(buffer, yield[ec]);
        ... process the buffer
        ... execute posted callbacks
    }
}

void another_thread()
{
    while(isAppNotExit)
    {
        post_to_specified_coroutine(ws, []() {   ... do in courutine same thread });
    }
}

I need to post a function in any thread to let the specified coroutine run the function, that is the code part of "execute posted callbacks" above. However, after this task is delivered, the coroutine may be in async_read or async_write state. Is it possible to post an event like data and let the async_read or async_write function return immediately?

coollofty
  • 339
  • 4
  • 15
  • If such a goal is difficult, then how to write to any client websocket in any thread? – coollofty Jan 27 '21 at 02:55
  • It's not clear what you're trying to achieve, but consider rewriting your `do_session` coroutine into a class with an internal state machine, which uses the traditional asynchronous operations with callbacks. This way the calls for asynchronous I/O operations are non-blocking. – Emile Cormier Aug 13 '22 at 16:39

1 Answers1

0

I guess the essence of the problem is this: use select on 2 channels: a channel with capacity=1 and a channel with (possibly) infinite capacity.

Implement select with asio asynchronous operation

Write an asio asynchronous operation to wait for multiple (two) things.
(asio asynchronous operation template: c++ - How to wait for a function to return with Boost:::Asio? - Stack Overflow).

state protected by a mutex:

  • a std::optional<read_result>
  • a std::vector<functor>
  • a bool (whether there is an on-going async_read)
  • a std::optional<completion handler>

your async_wait_for_2_things:

  1. Get the completion handler (a callable, can resume your coroutine) from the completion token (yield[ec]);
  2. Lock the mutex (use guard);
  3. if there is a pending functor from another_thread, take it out, post the completion handler;
  4. else if there is a pending read_result, take it out, post the completion handler;
  5. else if there is a an on-going async_read (the bool is true), store the completion handler (if there is already a completion handler stored, throw "can not happen");
  6. else (no pending functor, no pending read_result, async_read has not been started), store the completion handler (if there is already a completion handler stored, throw "can not happen"), set the bool to true (if the bool is already true, throw "can not happen"), call async_read;
  7. Unlock the mutex;

async_read's callback:

  1. Lock the mutex (use guard);
  2. set the bool to false (if the bool is already false, throw "can not happen");
  3. if there is a completion handler, take it out, post it;
  4. else, store read_result (if there is already a read_result stored, throw "can not happen");
  5. Unlock the mutex;

another_thread's code for posting functor:

  1. Lock the mutex (use guard);
  2. if there is a completion handler, take it out, post it;
  3. else, store functor;
  4. Unlock the mutex;

Implement select using asynchronous event

  1. async_read(use callback overload)'s lambda completion handler: stores result, notifies asynchronous_event;
  2. another_thread: stores functor, notifies asynchronous_event;
  3. do_session: asynchronously waits on asynchronous_event, loads result or functor;
  4. asynchronous_event's data is in a std::pair<std::optional<read_result>, std::vector<functor>> protected by a mutex;

Implement asynchronous event using a timer: c++ - Why does Boost.Asio not support an event-based interface? - Stack Overflow.

This is not applicable because "asynchronous event" is-not-a "asynchronous condition variable", it can not:

  • release a mutex and block in asynchronous wait atomically

(a possible sequence: do_session release mutex, then functor is posted, then event is notified (cancel_one), then do_session wait on event (timer_.async_wait(yield[ec]);) and blocks forever)


Implement select using asynchronous latch

  1. async_read(use callback overload)'s lambda handler: ①stores result and resets asynchronous_latch_producer, ②notifies asynchronous_latch_consumer, waits on asynchronous_latch_producer(, ⑥wake up);
  2. another_thread: ①stores functor and resets asynchronous_latch_producer, ②notifies asynchronous_latch_consumer, waits on asynchronous_latch_producer(, ⑥wake up);
  3. do_session: waits on asynchronous_latch_consumer(, ③wake up), ④loads result or functor and resets asynchronous_latch_consumer, ⑤notifies asynchronous_latch_producer;
  4. asynchronous_latch_consumer and asynchronous_latch_producer's data is in a std::pair<std::optional<read_result>, std::vector<functor>>;

Implement asynchronous latch using a timer: c++ - Cancelling boost asio deadline timer safely - Stack Overflow. Modify that asynchronous event implementation to get asynchronous latch: in constructor and reset, .expires_at(Timer::clock_type::time_point::max()); in notify_all_one_shot, .expires_at(Timer::clock_type::time_point::min()).

This is not applicable because one of the producer might block forever.

jhcarl0814
  • 108
  • 2
  • 8