2

I'm trying to understand a bit better how async asio works.

I have the following code, where I'm calling async_read on a socket to read the next 10 bytes of data.

struct SocketReader {
    void do_read_body()
    {
        asio::async_read(socket_,
            asio::buffer(msg_, 10),
            [this](asio::error_code ec, std::size_t length)
            {
                if (!ec)
                {
                    //messages_to_work_on.emplace_back(msg_); // <-- I'm trying to send this msg_ instance to another io_context
                    do_read_body(); // call again
                }
                else
                {
                    socket_.close();
                }
            });
    }
std::vector<uint8_t> msg_;
asio::tcp::socket _socket;
}

These reads are done inside an io_context running in his own std::thread, where I'm collecting in a queue all messages read from the socket. So far so good.

I have also another "worker" class that just executes some work based on what is available in his queue:

struct Worker
{
    asio::io_context& io_context_;
    std::deque< std::vector<uint8_t> > queue;
    Worker(asio::io_context& io_context)
        : io_context_(io_context) {
        asio::post(io_context_, [this]() {doWork();});
    }
    void doWork() {
        if (!queue.empty())
        {
            // do some work with front()
            queue.pop_front();
        }
        asio::post(io_context_, [this]() {doWork();});
    }
};

That one is also executing in his own io_context, running in his own thread. So there is concurrency between the socket thread and the worker thread.

What is the correct way to post the data received from the socket, to the worker class ? I'm thinking I should be able to call from the socket completion handler, something like:

asio::post(worker_io_context, [this]() {worker.queue.push_back(msg_)});

That way, I'm at least sure that the worker queue is not used concurently. But I'm not sure if I'm allowed to post from one io_context to the other, and also if I won't create another race condition this way. I also don't really understand where the memory for my message should be located, especially "in between" the transfer from one io_context to the other. Is it required I pass the message by value (since this.msg_ can be modified before the post handler is executed) ?

Thanks!

lezebulon
  • 7,607
  • 11
  • 42
  • 73
  • I don't understand why so complicated - just call the function doWork() instead of "messages_to_work_on.emplace_back(msg_);" and you will have the expected behaviour, without "deques", without race conditions, without headaches, without two io_contexts, without two threads. – Heto Dec 14 '21 at 23:54
  • @Heto The point is that do_work is potentially doing long-running computation, so I don't want to block the thread that is reading the socket (which what you propose would do). Instead I want to send the data in another thread (another io_context) so it can be worked on without blocking the rest – lezebulon Dec 14 '21 at 23:58

3 Answers3

2

I should be able to call from the socket completion handler, something like:

asio::post(worker_io_context, [this]() {worker.queue.push_back(msg_)});

Sure.

That way, I'm at least sure that the worker queue is not used concurently. But I'm not sure if I'm allowed to post from one io_context to the other,

io_context are not magic. They're basically cooperative task queues.

and also if I won't create another race condition this way.

I'm not going to sit here and pass a verdict without seeing your code (I might not want to read all of it anyways), but let me repeat: io_context are not magic. You can reason about them the way you already know how to in terms of threads, tasks and resources.

I also don't really understand where the memory for my message should be located, especially "in between" the transfer from one io_context to the other. Is it required I pass the message by value (since this.msg_ can be modified before the post handler is executed) ?

Yes. Indeed. Something like

post(worker_io_context, [this, msg=std::move(msg_)]() {worker.queue.push_back(std::move(msg)); });

If moving isn't cheap, there's the option of having a refcounted smart pointer (like shared_ptr). Consider making it smartpointer<T const> if you actually share ownership between threads.


Shower thought: maybe you can do without the "worker" queues. Since you're moving to reactor-style asynchrony (using Asio), you might focus on queueing the tasks, instead of the data. Reasons to not do that would include when you want to have priority queuing, load balancing/back pressure etc. [In principle all these can be implemented using custom executors, but I would stick to what I know before doing that.]

sehe
  • 374,641
  • 47
  • 450
  • 633
  • Thanks ! I kinda figured it out after typing, that what I intended to do was possible with asio. What I'm realizing now is that basically the completion handlers are not "free" and they can come with lots of data attached through the closure, if the params are passed by value. I also discovered that there exist http://think-async.com/Asio/asio-1.11.0/doc/asio/overview/cpp2011/move_handlers.html movable handlers, to potentially avoid lots of copies of the heavy handlers within asio code (not sure how much it can happen) – lezebulon Dec 15 '21 at 00:22
  • 1
    To avoid costly handlers you can put the data outside the handler. When crossing contexts/executors I'd want to unshare the data to avoid synchronization issues though. Another typical pattern is "handler-is-operation-type" where the operation type might contain a unique_ptr/shared_ptr to all the relevant state. – sehe Dec 15 '21 at 00:31
  • I really don't see how to put the data outside the handler without going back to the synchronization issue that I have in the first place. It's even worse since lots of handlers can have been created and be pending at the same time, so it's not like I can just put my data to transfer in one specific place as a global variable or something. In my case it's simple enough because the data is cheaply movable since it's all std::vectors – lezebulon Dec 15 '21 at 00:35
  • It's as simple as you want: shared_ptrs do the job just fine, e.g. Of course, shared_ptr can be counter to performant code, and is oft over-used, but it actually fits the bill nicely here. See e.g. the enormous speed-ups I achieved here https://gist.github.com/sehe/a32a59096279d5fef99c9824a6da0168#file-connection-h-L12 (context: https://chat.stackoverflow.com/transcript/116940?m=53027486#53027486 and https://chat.stackoverflow.com/transcript/message/53042186#53042186). But yeah, it all depends on the data flow and access patterns. – sehe Dec 15 '21 at 00:59
1

You don't need more than one io_context, even for multi-threading applications you can use only one io_context. You want to make your SocketReader a shared pointer, everytime when a read is happening, add a count to it. I'm assuming acceptor, socket creation and some_io_context.run() parts are done. I would do something like this:

class SocketReader 
   : public std::enable_shared_from_this<SocketReader> // we need this! 
{
public:
    // Constructor
    SocketReader(io_context& ctx) 
    : ctx_(ctx)
    {
    }
    
    // read
    void do_read_body()
    {
        auto self(this->shared_from_this()); // we need this!
        asio::async_read(socket_,
            asio::buffer(msg_, 10),
            [this](asio::error_code ec, std::size_t length)
            {
                if (!ec)
                {
                    // later edit - @sehe is spot on -> is better to move it
                    asio::post(ctx_, [this, msg=std::move(msg_)]() { // do the long work });
                    do_read_body();
                }
                else
                {
                    socket_.shutdown(asio::tcp::socket::shutdown_send, ec);
                    socket_.close();
                }
            });
    }
    
private:
    
    io_context& ctx_; // l.e. ctx_ must be a reference
    vector<uint8_t> msg_;
    asio::tcp::socket _socket;
};

...
// somewhere in the code
auto s_reader = std::make_shared<SocketReader>(some_io_context);
s_reader->do_read_body();
Heto
  • 590
  • 4
  • 9
  • ok so if I understand correctly, if I use 2 threads to run this io_context, then potentially both my lambda and do_read_body could be executed concurrently ? – lezebulon Dec 15 '21 at 00:51
  • you don't need two threads. they are not executed "concurrently", they are executed asynchronous. It's OS/asio job to take care of it. – Heto Dec 15 '21 at 00:53
  • but, if I want them executed concurrently, I need 2 threads right ? that was my initial intent – lezebulon Dec 15 '21 at 00:59
  • You don't know when they will be executed. You are doing the request to the OS to take care of this business for you, you name your conditions(the handlers) and you don't block your thread. When the conditions are met, they will be executed. – Heto Dec 15 '21 at 01:07
  • some thoughts: you could do a context_pool/thread_pool. To have two(or more) threads, each one with is own io_context, one used to create the socket, one(or more if you find it useful) to do the work. There is no need to protect your data(with mutex/semaphore) because you are not sharing any of it between the threads. – Heto Dec 15 '21 at 01:22
0

Note: you do not need an extra io_context.
If you have to do long running computation you can write your own async_xyz function and use it like the other async functions. The idea is to post the work to a boost thread pool to do the computation there and then call the completion handler when the work is done. Here is an example using a boost thread pool to do the time consuming hashing of a password.

template <boost::asio::completion_token_for<void (std::string)> CompletionToken>
auto
async_hash (boost::asio::thread_pool &pool, boost::asio::io_context &io_context, std::string const &password, CompletionToken &&token)
{
  return boost::asio::async_initiate<CompletionToken, void (std::string)> (
      [&] (auto completion_handler, std::string const &passwordToHash) {
        auto io_eq = boost::asio::prefer (io_context.get_executor (), boost::asio::execution::outstanding_work.tracked);
        boost::asio::post (pool, [&, io_eq = std::move (io_eq), completion_handler = std::move (completion_handler), passwordToHash] () mutable {
          auto hashedPw = pw_to_hash (passwordToHash);
          boost::asio::post (io_eq, [hashedPw = std::move (hashedPw), completion_handler = std::move (completion_handler)] () mutable { completion_handler (hashedPw); });
        });
      },
      token, password);
}

Then call it:

auto hashedPw = co_await async_hash (pool, io_context, createAccountObject.password, boost::asio::use_awaitable);

It seems like you do not use coroutine ts so I think you have to do something like this

async_hash (pool, io_context, createAccountObject.password, /*your lambda here*/);
Koronis Neilos
  • 700
  • 2
  • 6
  • 20
  • I think the - valid - concern is to separate the load between different thread pools. That's a very common design pattern, and the natural way to achieve it is to have different execution contexts being serviced from separate threads/thread pools. Otherwise you end up having no control over priorities (work congestion could starve IO tasks etc.) – sehe Dec 15 '21 at 00:26
  • I think there could be a scenario in which this happens. But its hard to imagine when using coroutine ts. I would love to see an example where this happens. Maybe If we use a lot of threads (of course on a server with lots of cores) and they do nothing beside posting to the io_context? Also we could use this "https://www.boost.org/doc/libs/master/doc/html/boost_asio/reference/io_context/io_context/overload2.html" (I never tried it) and give the context more threads? – Koronis Neilos Dec 15 '21 at 00:53
  • 1
    When using coroutine TS this is exactly the same. It's basically just syntactic sugar. As soon as you do some heavy work (synchronously) you might hurt latencies of handlers. Coroutine, call back style, asio::spawn, it all comes down to the same thing: tasks being scheduled on execution contexts. – sehe Dec 15 '21 at 01:01
  • Re: the concurrency hint: that helps optimize a tiny bit in cases of known single-threaded access. In practice, a **much higher** gain is to be had by reducing the type-erasure that happens by default in all Asio IO objects (due to `any_io_executor`, see e.g. this example https://chat.stackoverflow.com/transcript/230461?m=51875036#51875036) – sehe Dec 15 '21 at 01:04