0

I am facing the following problem:

I have a RpcExecutor class. This class:

  1. sends rpc requests
  2. receives rpc responses

I keep everything asynchronous (through boost.asio). Every time a request is sent, a write handler will be invoked that modifies a map:

async_write(...., [this, ...](error_code ec, size_t bytesTransferred) {
                    //
                    pendingRequests_.insert(requestId, move(myPromise));
           };

I start to listen in the same socket, with async_read_until, and replies come, possibly out of order. I also need to modify pendingRequests_ from this hanlder:

async_read(... [this, ...](error_code ec, size_t bytesTransferred) {
                  ...
                  pendingRequests_.at(requestId).set_value(...);
                  pendingRequests_.erase(requestId);
});

My class looks like this:

class RpcExecutor {
private:
      std::unique_ptr<std::map<std::uint64_t, MyPromise>> pendingRequests_;
      ...
};

In order to make sure that the initialization of pendingRequests_, the reading and the writing of pendingRequests_ are done through the same thread (I did check it is the case), I have the following restrictions applied:

  1. there is a single asio::run() thread running, which is a different thread from RpcExecutor instance.
  2. I initialize the pendingRequests_ pointed-to object inside a boost::asio::post(myIoContext, ...), meaning that it is initialized in the same thread where asio::run is executed.
  3. The async_read handler and async_write handler are executed in the same thread as io_context::run, which is the same thread as boost::asio::post.

All in all:

  • boost::asio::post, async_read handler and async_write handler are executed in the same thread.
  • RpcExecutor class instance is created in another thread.

Result

  • async_read and async_write handler do not see the same memory address for pendingRequests_.

Questions

  1. How should I initialize a map that I can use from my handlers thread taking into account that RpcExecutor class instance is in another thread? Even if I initialize the pointed-to element in the same thread as asio::context::run via post, the handlers still see different addresses for the object.
  2. Do I need mutexes of any kind? I want, actually, to do all reads/writes from a single thread, even if not the same thread as RpcExecutor class instance, which holds the pendingTasks_member variable.
Germán Diago
  • 7,473
  • 1
  • 36
  • 59
  • https://stackoverflow.com/questions/12794107/why-do-i-need-strand-per-connection-when-using-boostasio/12801042 would be helpful – sehe Dec 12 '18 at 12:47
  • Thanks. It is indeed a different case and I have no idea how to fix it yet after reading – Germán Diago Dec 12 '18 at 13:16

1 Answers1

0

The problem was very subtle:

I was doing this at some line of my code:

executor_ = RpcExecutor(socket_);

The RpcExecutor was then created, started to listen for incoming messages, all in the constructor. But that object was moved to executor_ variable after that.

Since my async_read and async_read handlers were capturing this and this was not the same address after the move, then they were showing different addresses:

  • the incoming messages read was showing the original object this.
  • the outgoing messages were showing the already-moved this address.

Solution

  1. Make the RpcExecutor non-copyable and non-movable for safety.
  2. I also thought it was a good idea to call RpcExecutor::listenIncomingMessage explicitly.
Germán Diago
  • 7,473
  • 1
  • 36
  • 59