0

I'm building a network service with boost::asio and I'm unsure about the thread safety.

io_service.run() is called only once from a thread dedicated for the io_service work

send_message() on the other hand can be called either by the code inside the second io_service handlers mentioned later, or by the mainThread upon user interaction. And that is why I'm getting nervous.

std::deque<message> out_queue;

// send_message will be called by two different threads
void send_message(MsgPtr msg){
    while (out_queue->size() >= 20){    
        Sleep(50);
    }
    io_service_.post([this, msg]() { deliver(msg);  });
}

// from my understanding, deliver will only be called by the thread which called io_service.run()
void deliver(const MsgPtr){
    bool write_in_progress = !out_queue.empty();
    out_queue.push_back(msg);
    if (!write_in_progress)
    {
        write();
    }
}

void write()
{
    auto self(shared_from_this());

    asio::async_write(socket_,
        asio::buffer(out_queue.front().header(),
        message::header_length),    [this, self](asio::error_code ec, std::size_t/)
    {

        if (!ec)
        {
            asio::async_write(socket_,
                asio::buffer(out_queue.front().data(),
                out_queue.front().paddedPayload_size()),
                [this, self](asio::error_code ec, std::size_t /*length*/)
            {
                if (!ec)
                {
                    out_queue.pop_front();
                    if (!out_queue.empty())
                    {
                        write();
                    }
                }

            });
        }

    });

}

Is this scenario safe?

A similar second scenario: When the network thread receives a message, it posts them into another asio::io_service which is also run by its own dedicated thread. This io_service uses an std::unordered_map to store callback functions etc.

std::unordered_map<int, eventSink> eventSinkMap_;

//...

// called by the main thread (GUI), writes a callback function object to the map
int IOReactor::registerEventSink(std::function<void(int, std::shared_ptr<message>)> fn, QObject* window, std::string endpointId){
    util::ScopedLock lock(&sync_);

    eventSink es;
    es.id = generateRandomId();
    // ....
    std::pair<int, eventSink> eventSinkPair(es.id, es);

    eventSinkMap_.insert(eventSinkPair);

    return es.id;
}

// called by the second thread, the network service thread when a message was received
void IOReactor::onMessageReceived(std::shared_ptr<message> msg, ConPtr con)
{    
    reactor_io_service_.post([=](){ handleReceive(msg, con); });
}

// should be called only by the one thread running the reactor_io_service.run()
// read and write access to the map
void IOReactor::handleReceive(std::shared_ptr<message> msg, ConPtr con){
   util::ScopedLock lock(&sync_);
   auto es = eventSinkMap_.find(msg.requestId);
    if (es != eventSinkMap_.end())
    {
    auto fn = es->second.handler;
    auto ctx = es->second.context;
    QMetaObject::invokeMethod(ctx, "runInMainThread", Qt::QueuedConnection, Q_ARG(std::function<void(int, std::shared_ptr<msg::IMessage>)>, fn), Q_ARG(int, CallBackResult::SUCCESS), Q_ARG(std::shared_ptr<msg::IMessage>, msg));

    eventSinkMap_.erase(es);       
}

first of all: Do I even need to use a lock here?

Ofc both methods access the map, but they are not accessing the same elements (the receiveHandler cannot try to access or read an element that has not yet been registered/inserted into the map). Is that threadsafe?

netik
  • 1,736
  • 4
  • 22
  • 45

1 Answers1

1

First of all, a lot of context is missing (where is onMessageReceived invoked, and what is ConPtr? and you have too many questions. I'll give you some specific pointers that will help you though.

  1. You should be nervous here:

    void send_message(MsgPtr msg){
        while (out_queue->size() >= 20){    
            Sleep(50);
        }
        io_service_.post([this, msg]() { deliver(msg);  });
    }
    

    The check out_queue->size() >= 20 requires synchronization unless out_queue is thread safe.

    The call to io_service_.post is safe, because io_service is thread safe. Since you have one dedicated IO thread, this means that deliver() will run on that thread. Right now, you need synchronization there too.

    I strongly suggest using a proper thread-safe queue there.

  2. Q. first of all: Do I even need to use a lock here?

    Yes you need to lock to do the map lookup (otherwise you get a data race with the main thread inserting sinks).

    You do not need to lock during the invocation (in fact, that seems like a very unwise idea that could lead to performance issue or lockups). The reference remains valid due to Iterator invalidation rules.

    The deletion of course requires a lock again. I'd revise the code to do deletion and removal at once, and invoke the sink only after releasing the lock. NOTE You will have to think about exceptions here (in your code when there is an exception during invocation, the sink doesn't get removed (ever?). This might be important to you.

    Live Demo

    void handleReceive(std::shared_ptr<message> msg, ConPtr con){
        util::ScopedLock lock(&sync_);
        auto es = eventSinkMap_.find(msg->requestId);
        if (es != eventSinkMap_.end())
        {
            auto fn  = es->second.handler;
            auto ctx = es->second.context;
            eventSinkMap_.erase(es); // invalidates es
    
            lock.unlock();
            // invoke in whatever way you require
            fn(static_cast<int>(CallBackResult::SUCCESS), std::static_pointer_cast<msg::IMessage>(msg));
        }
    }
    
Community
  • 1
  • 1
sehe
  • 374,641
  • 47
  • 450
  • 633
  • Thank you, this was very helpful! Btw: sorry, 'onMessageReceived' is called in the connections `asio::async_read()` handler after reading and parsing the message. – netik Apr 20 '16 at 09:54