I'm building a network service with boost::asio
and I'm unsure about the thread safety.
io_service.run()
is called only once from a thread dedicated for the io_service work
send_message()
on the other hand can be called either by the code inside the second io_service handlers mentioned later, or by the mainThread upon user interaction. And that is why I'm getting nervous.
std::deque<message> out_queue;
// send_message will be called by two different threads
void send_message(MsgPtr msg){
while (out_queue->size() >= 20){
Sleep(50);
}
io_service_.post([this, msg]() { deliver(msg); });
}
// from my understanding, deliver will only be called by the thread which called io_service.run()
void deliver(const MsgPtr){
bool write_in_progress = !out_queue.empty();
out_queue.push_back(msg);
if (!write_in_progress)
{
write();
}
}
void write()
{
auto self(shared_from_this());
asio::async_write(socket_,
asio::buffer(out_queue.front().header(),
message::header_length), [this, self](asio::error_code ec, std::size_t/)
{
if (!ec)
{
asio::async_write(socket_,
asio::buffer(out_queue.front().data(),
out_queue.front().paddedPayload_size()),
[this, self](asio::error_code ec, std::size_t /*length*/)
{
if (!ec)
{
out_queue.pop_front();
if (!out_queue.empty())
{
write();
}
}
});
}
});
}
Is this scenario safe?
A similar second scenario: When the network thread receives a message, it posts them into another asio::io_service
which is also run by its own dedicated thread. This io_service uses an std::unordered_map
to store callback functions etc.
std::unordered_map<int, eventSink> eventSinkMap_;
//...
// called by the main thread (GUI), writes a callback function object to the map
int IOReactor::registerEventSink(std::function<void(int, std::shared_ptr<message>)> fn, QObject* window, std::string endpointId){
util::ScopedLock lock(&sync_);
eventSink es;
es.id = generateRandomId();
// ....
std::pair<int, eventSink> eventSinkPair(es.id, es);
eventSinkMap_.insert(eventSinkPair);
return es.id;
}
// called by the second thread, the network service thread when a message was received
void IOReactor::onMessageReceived(std::shared_ptr<message> msg, ConPtr con)
{
reactor_io_service_.post([=](){ handleReceive(msg, con); });
}
// should be called only by the one thread running the reactor_io_service.run()
// read and write access to the map
void IOReactor::handleReceive(std::shared_ptr<message> msg, ConPtr con){
util::ScopedLock lock(&sync_);
auto es = eventSinkMap_.find(msg.requestId);
if (es != eventSinkMap_.end())
{
auto fn = es->second.handler;
auto ctx = es->second.context;
QMetaObject::invokeMethod(ctx, "runInMainThread", Qt::QueuedConnection, Q_ARG(std::function<void(int, std::shared_ptr<msg::IMessage>)>, fn), Q_ARG(int, CallBackResult::SUCCESS), Q_ARG(std::shared_ptr<msg::IMessage>, msg));
eventSinkMap_.erase(es);
}
first of all: Do I even need to use a lock here?
Ofc both methods access the map, but they are not accessing the same elements (the receiveHandler cannot try to access or read an element that has not yet been registered/inserted into the map). Is that threadsafe?