1

Background

I'm new to using Boost::Asio library and am having trouble getting the behaviour I want. I am trying to implement some network communication for custom hardware solution. The communication protocol stack we are using relies heavily on Boost::Asio async methods and I don't believe it is entirely thread safe.

I have successfully implemented sending but encountered a problem when trying to setup the await for receiving. Most boost::asio examples I have found rely on socket behaviour to implement async await with socket_.async_read_some() or other similar functions. However this doesn't work for us as our hardware solution requires calling driver function directly rather than utilising sockets.

The application uses an io_service that is passed into boost::asio::generic::raw_protocol::socket as well as other classes.

Example code from protocol stack using sockets

This is the example code from the protocol stack. do_receive() is called in the constructor of RawSocketLink.

void RawSocketLink::do_receive()
{
    namespace sph = std::placeholders;
    socket_.async_receive_from(
            boost::asio::buffer(receive_buffer_), receive_endpoint_,
            std::bind(&RawSocketLink::on_read, this, sph::_1, sph::_2));
}

void RawSocketLink::on_read(const boost::system::error_code& ec, std::size_t read_bytes)
{
    if (!ec) {
        // Do something with received data...
        do_receive();
    }
}

Our previous receive code without the protocol stack

Prior to implementing the stack we had been using the threading library to create separate threads for send and recieve. The receive method is shown below. Mostly it relies on calling the receive_data() function from the hardware drivers and waiting for it to return. This is a blocking call but is required to return data.

void NetworkAdapter::Receive() {

  uint8_t temp_rx_buffer[2048];
  rc_t rc;
  socket_t *socket_ptr;
  receive_params_t rx_params;
  size_t rx_buffer_size;
  char str[100];

  socket_ptr = network_if[0];

  while (1) {
    rx_buffer_size = sizeof(temp_rx_buffer);
    // Wait until receive_data returns then process
    rc = receive_data(socket_ptr,
                     temp_rx_buffer,
                     &rx_buffer_size,
                     &rx_params,
                     WAIT_FOREVER);
    if (rc_error(rc)) {
      (void)fprintf(stderr, "Receive failed");
      continue;
    }
    
    // Do something with received packet ....
    
  }

  return;
}

Note that the socket_t pointer in this code is not the same thing as a TCP/UDP socket for Boost::Asio.

Current implement of async receive

This is my current code and where I need help. I'm not sure how to use boost::asio method to wait for receive_data to return. We are trying to replicate the behaviour of socket.async_read_from(). The NetworkAdapter has access to the io_service.

void NetworkAdapter::do_receive() {
  
  rc_t rc;
  socket_t *socket_ptr;
  receive_params_t rx_params;
  size_t rx_buffer_size;

  socket_ptr = network_if[0];

  rx_buffer_size = receive_buffer_.size();
  
  // What do I put here to await for this to return asynchronously?
  rc = receive_data(socket_ptr, receive_buffer_.data(), &rx_buffer_size, &rx_params, ATLK_WAIT_FOREVER);
  on_read(rc, rx_buffer_size, rx_params);
}

void NetworkAdapter::on_read(const rc_t &rc, std::size_t read_bytes, const receive_params_t &rx_params) {
  if (!rc) {

    // Do something with received data...

  } else {
    LOG(ERROR) << "Packet receieve failure";
  }
  do_receive();
}

Summary

How do I use boost::asio async/await functions to await a function return? In particular I want to replicate the behaviour of socket.async_receive_from() but with a function rather than a socket.


*Some function names and types have been changed due to data protection requirements.

linksassin
  • 111
  • 1
  • 6
  • Are you using coroutines? If so, what compiler/version? If not, what do you mean with `async`/`await`? Implementing `async_`-style functions with blocking API's is advanced: you would need to implement a custom service, which is not something you should be learning unless you're very experienced with Asio programming and internals. There are some shortcuts, but in short they don't gain you anything than extra code over just using threads + e.g. promises – sehe Jan 12 '21 at 01:11
  • @sehe We have an io_service that all classes have access to. io_service.run() is called at the end of the main function. The `do_receive()` function is called in the constructor of the NetworkAdapter. – linksassin Jan 12 '21 at 01:23
  • @sehe I'm not learning this by choice, we have kind of been forced into it by the protocol stack and hardware api we are using. We are using gcc cross-compiling to arm. – linksassin Jan 12 '21 at 01:34
  • The simplest way I can think of to integrate would be to push your tasks to an extra thread and post completion handlers onto the io_service from there. – sehe Jan 12 '21 at 01:55
  • @sehe As in run `do_receive()` in a separate thread and then use `post` to call `on_read()` on the io_service thread? Any chance you have an example of what that would look like? I looked at post in the boost docs but couldn't figure out how to apply it to my situation. – linksassin Jan 12 '21 at 02:17

1 Answers1

0

N4045 Library Foundations for Asynchronous Operations, Revision 2
http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2014/n4045.pdf

On page 24 there is an example on how to implement an asio async API in terms of callback-based os API.

// the async version of your operation, implementing all kinds of async paradigm in terms of callback async paradigm
template <class CompletionToken>
auto async_my_operation(/* any parameters needed by the sync version of your operation */, CompletionToken&& token) 
{
  // if CompletionToken is a callback function object, async_my_operation returns void, the callback's signature should be void(/* return type of the sync version of your operation, */error_code)
  // if CompletionToken is boost::asio::use_future, async_my_operation returns future</* return type of the sync version of your operation */>
  // if CompletionToken is ..., ...

  // you are not inventing new async paradigms so you don't have to specialize async_completion or handler_type, you should focus on implement the os_api below
  async_completion<CompletionToken, void(/* return type of the sync version of your operation, */error_code)/* signature of callback in the callback case */> completion(token); 
  typedef handler_type_t<CompletionToken, void(error_code)> Handler; 
  unique_ptr<wait_op<Handler>> op(new wait_op<Handler>(move(completion.handler))); // async_my_operation initates your async operation and exits, so you have to store completion.handler on the heap, the completion.handler will be invoked later on a thread pool (e.g. threads blocked in IOCP if you are using os api, threads in io_context::run() if you are using asio (sockets accept an io_context during construction, so they know to use which io_context to run completion.handler))
  
  // most os api accepts a void* and a void(*)(result_t, void*) as its C callback function, this is type erasure: the void* points to (some struct that at least contains) the C++ callback function object (can be any type you want), the void(*)(result_t, void*) points to a C callback function to cast the void* to a pointer to C++ callback function object and call it
  os_api(/* arguments, at least including:*/ op.get(), &wait_callback<Handler>);

  return completion.result.get();
}

// store the handler on the heap
template <class Handler>
struct wait_op {
  Handler handler_;
  explicit wait_op(Handler  handler) : handler_(move(handler)) {}
};

// os post a message into your process's message queue, you have several threads blocking in a os api (such as IOCP) or asio api (such as io_context::run()) that continuously takes message out from the queue and then call the C callback function, the C callback function calls your C++ callback function
template <class Handler> 
void wait_callback(result_t result, void* param) 
{
  unique_ptr<wait_op<Handler>> op(static_cast<wait_op<Handler>*>(param));
  op‐>handler_(/* turn raw result into C++ classes before passing it to C++ code */, error_code{});
}

//trivial implementation, you should consult the socket object to get the io_context it uses
void os_api(/* arguments needed by your operation */, void* p_callback_data, void(*p_callback_function)(result_t, void*))
{
  std::thread([](){
    get the result, blocks
    the_io_context_of_the_socket_object.post([](){ (*p_callback_function)(result, p_callback_data); });
  }).detach();
}

boost.asio has changed from async_completion and handler_type to async_result, so the above code is outdated.

Requirements on asynchronous operations - 1.75.0 https://www.boost.org/doc/libs/1_75_0/doc/html/boost_asio/reference/asynchronous_operations.html

jhcarl0814
  • 108
  • 2
  • 8
  • But the API isn't call-back based – sehe Jan 12 '21 at 13:48
  • @sehe It seems that receive_data is a blocking API, we need to donate a thread to be blocked to use it. So I used std::thread to run receive_data and call the callback, to convert the blocking API into a callback-based API. – jhcarl0814 Jan 12 '21 at 14:11
  • @sehe The technique I used is from C#. It is inside a comment of https://blog.stephencleary.com/2013/11/there-is-no-thread.html : "Now, if you're in a UI context and you want to avoid blocking your UI thread, then you can call synchronous code asynchronously by using Task.Run. Task.Run will synchronously block a thread pool thread, allowing the UI thread to treat the work asynchronously." – jhcarl0814 Jan 12 '21 at 14:17
  • I just responded to the intro "On page 24 there is an example on how to implement an asio async API ***in terms of callback-based os API*** which, a priori, isn't applicable. I see that you "forced" the issue with ad-hoc threads, which might work, though it has many downsides. It's the simplest take on [this suggestion](https://stackoverflow.com/questions/65676197/how-to-wait-for-a-function-to-return-with-boostasio?noredirect=1#comment116120495_65676197) in a way – sehe Jan 12 '21 at 14:46
  • @sehe OP wants to use async/await (maybe from C# or Python), and async/await in C++ should mean `auto r = co_await async_my_operation(arguments, boost::asio::use_awaitable);`, which does not block threads calling `io_context::run`. Using detached thread + promise is appealing (I always do that at the first) but doesn't allow `co_await` and `std::future`'s methods does block threads calling `io_context::run`. – jhcarl0814 Jan 13 '21 at 00:27
  • I know. My point of caution is about spinning up threads at each request. That's why I suggested a slightly more involved solution with a persistent thread that can service these. And the true Asio solution would embody that but with a decoupling service implementation. – sehe Jan 13 '21 at 00:32
  • 1
    @sehe Maybe we need a persistent thread and two std::list to implement that service, one std::list is for storing incoming API requests and is protected by a mutex. The persistent thread splices the first std::list's nodes into the second std::list, then processes every node of the second std::list calling `io_context::post` after blocking. OP can do that later himself if there is enough time. – jhcarl0814 Jan 13 '21 at 01:25