1

I have a websocket server that writes messages to a client using

ws_.async_write(
    boost::asio::buffer( msg, msg.length()),
    boost::asio::bind_executor(
        strand_,
        std::bind(
            &cSession::on_write,
            shared_from_this(),
            std::placeholders::_1,
            std::placeholders::_2)));

strand_ is initialized in the CTOR strand_(make_strand(ioc)) The asio documentation states that this should make the writes thread safe.

Running in two other threads, I have message generators that send a message every three seconds. So every three seconds the websocket attempts to execute two async_write calls almost simultaneously. After a several, or sometimes on the first, attempts, an assertion fails

Assertion failed!
    Program: C:\Users\James\code\echoer\bin\WProcessServer.exe
File: C:\Users\James\code\boost\boost_1_70_0/boost/beast/websocket/detail/soft_mutex.hpp, Line 75

Expression: id_ == T::id

which the source code says

    // If this assert goes off it means you are attempting to
    // simultaneously initiate more than one of same asynchronous
    // operation, which is not allowed. For example, you must wait
    // for an async_read to complete before performing another
    // async_read.

So my use of strand has not made the server thread safe. Why not?

Here is the code for a complete minimal program that demos the problem:

    #include <iostream>
    #include <algorithm>
    #include <thread>

    #include <boost/beast/core.hpp>
    #include <boost/beast/websocket.hpp>
    #include <boost/asio/ip/tcp.hpp>
    #include <boost/asio/strand.hpp>
    #include <boost/asio/bind_executor.hpp>

    using tcp = boost::asio::ip::tcp;               // from <boost/asio/ip/tcp.hpp>

    class cServer;

    // Report a failure
    void
    fail(boost::system::error_code ec, char const* what)
    {
        std::cerr << what << ": " << ec.message() << "\n";
    }

    /** Send messages at regular intervals to client from own thread

     Used to test the server is thread safe
    */

    class cMessenger
    {
    public:
        /** CTOR
            @param[in] io  io context
        */
        cMessenger(
            boost::asio::io_context& ioc,
            cServer& myServer,
            int id
        );

        /// Startup ( never returns - call in own thread )
        void Run();

        /// Schedule next message
        void Schedule();

        /// Send Message, then schedule next
        void onTimer();

    private:
        std::thread myThread;
        boost::asio::steady_timer myTimer;     /// timer controlling when massages are sent
        boost::asio::io_context& myIOC;
        cServer& myServer;
        int myID;
    };

    /// Websocket connection
    class cSession : public std::enable_shared_from_this<cSession>
    {
        /** The stream ( actually a TCP socket ) used to communicate with the client */
        boost::beast::websocket::stream<tcp::socket> ws_;

        /** The strand used to synchronize writes to the client
        Prevents a new write starting on the socket until previous write completes
        */
        boost::asio::strand<
        boost::asio::io_context::executor_type> strand_;

        /** Buffer storage for incoming messages from client */
        boost::beast::multi_buffer buffer_;

        cServer * myWebSocket;

    public:
        // Take ownership of the socket
        explicit
        cSession(
            boost::asio::io_context& ioc,
            tcp::socket socket,
            cServer * webSocket )
            : ws_(std::move(socket))
            , strand_(make_strand(ioc))
            , myWebSocket( webSocket )
        {
        }

        /** Start the asynchronous operation */
        void run();

        /** Handle websocket handshake completion */
        void on_accept(boost::system::error_code ec);

        /** Wait for next message from client */
        void do_read();

        /** Handle reception of message from client */
        void on_read(
            boost::system::error_code ec,
            std::size_t bytes_transferred);

        /** Write message to connection that came from elsewhere */
        void Write( const std::string& msg );

        /** Handle completion of write message from elsewhere */
        void on_write(
            boost::system::error_code ec,
            std::size_t bytes_transferred)
        {
            // Clear the buffer
            buffer_.consume(buffer_.size());
            //do_read();
        }
    };
    /// Accepts incoming connections and launches the sessions
    class cListener : public std::enable_shared_from_this<cListener>
    {
        boost::asio::io_context& ioc;        // io context
        boost::asio::ip::tcp::tcp::acceptor acceptor_;
        boost::asio::ip::tcp::tcp::socket socket_;
        cServer * myServer;

    public:
        cListener(
            boost::asio::io_context& ioc,
            boost::asio::ip::tcp::tcp::endpoint endpoint );

        void Set( cServer* server )
        {
            myServer = server;
        }

        /// Start accepting incoming connections
        void run()
        {
            if(! acceptor_.is_open())
                return;
            do_accept();
        }

        /// wait for client connection request
        void do_accept();

        /// handle a client connection request
        void on_accept(boost::system::error_code ec);
    };

    /** A process Server */
    class cServer
    {
    public:
        /** CTOR
            @param[in] port to listen for client connections

            Runs in its own thread
            Starts listening on port for client connections
            Starts boost asio io_context
        */
        cServer(
            boost::asio::io_context& ioc,
            const std::string& port );

        /** Returns when thread ends */
        void Join();

        /** New connection to client */
        void Set( cSession * session );

        /** Client connection lost */
        void SessionClosed();

        /** Receive message from the client
            @param[in] msg
        */
        void ClientMsg( const std::string& msg );

        /** Send message to client
            @param[in] msg
            @param[in] store true if message should be stored for client recconection, default true
        The message will be sent to client

        */
        void SendToClient(
            const std::string& msg,
            bool store = true );

        /// Get IO Context
        boost::asio::io_context& IOC()
        {
            return myIOC;
        }

    private:
        boost::asio::io_context& myIOC;
        unsigned short myPort;
        std::thread myThread;
        std::shared_ptr<cListener> myListener;
        cSession * mySession;

        void Run();
    };



    cListener::cListener(
        boost::asio::io_context& ioc_ref,
        tcp::endpoint endpoint )
        : ioc( ioc_ref )
        , acceptor_(ioc_ref)
        , socket_(ioc_ref)
    {
        boost::system::error_code ec;

        // Open the acceptor
        acceptor_.open(endpoint.protocol(), ec);
        if(ec)
        {
            fail(ec, "open");
            return;
        }

        // Allow address reuse
        acceptor_.set_option(boost::asio::socket_base::reuse_address(true));
        if(ec)
        {
            fail(ec, "set_option");
            return;
        }

        // Bind to the server address
        acceptor_.bind(endpoint, ec);
        if(ec)
        {
            fail(ec, "bind");
            return;
        }

        // Start listening for connections
        acceptor_.listen(
            boost::asio::socket_base::max_listen_connections, ec);
        if(ec)
        {
            fail(ec, "listen");
            return;
        }
    }

    void cListener::do_accept()
    {
        acceptor_.async_accept(
            socket_,
            std::bind(
                &cListener::on_accept,
                shared_from_this(),
                std::placeholders::_1));
    }

    void cListener::on_accept(boost::system::error_code ec)
    {
        if(ec)
        {
            fail(ec, "accept");
        }
        else
        {
            // Create the session
            auto s = std::make_shared<cSession>(
                         ioc,
                         std::move(socket_),
                         myServer );

            // run the session
            s->run();
        }
    }

    void cSession::run()
    {
        // Accept the websocket handshake
        ws_.async_accept(
            boost::asio::bind_executor(
                strand_,
                std::bind(
                    &cSession::on_accept,
                    shared_from_this(),
                    std::placeholders::_1)));
    }

    void cSession::on_accept(boost::system::error_code ec)
    {
        if(ec)
            return fail(ec, "accept");

        // let websocket know connection is up and running
        myWebSocket->Set( this );

        // Wait for first message from client
        //do_read();
    }

    void cSession::do_read()
    {
        // Read a message into our buffer
        ws_.async_read(
            buffer_,
            boost::asio::bind_executor(
                strand_,
                std::bind(
                    &cSession::on_read,
                    shared_from_this(),
                    std::placeholders::_1,
                    std::placeholders::_2)));
    }

    void cSession::on_read(
        boost::system::error_code ec,
        std::size_t bytes_transferred)
    {
        boost::ignore_unused(bytes_transferred);

        // This indicates that the session was closed
        if(ec == boost::beast::websocket::error::closed)
        {
            myWebSocket->SessionClosed();
            return;
        }

        if(ec)
        {
            myWebSocket->SessionClosed();
            return;
        }


        std::cout << "rcvd from client " << boost::beast::buffers_to_string(buffer_.data()) << "\n";

        // ???
        ws_.text(ws_.got_text());

        // wait for next message
        do_read();

    }

    void cSession::Write(
        const std::string& msg )
    {
        ws_.async_write(
            boost::asio::buffer( msg, msg.length()),
            boost::asio::bind_executor(
                strand_,
                std::bind(
                    &cSession::on_write,
                    shared_from_this(),
                    std::placeholders::_1,
                    std::placeholders::_2)));
    }

    cServer::cServer(
        boost::asio::io_context& ioc,
        const std::string& port )
        : myIOC( ioc )
        , myPort( static_cast<unsigned short>(std::atoi(port.c_str())) )
        , mySession( 0 )
    {
        std::cout << "Server starting...";
        myThread = std::thread(
                       &cServer::Run,
                       this );
        std::cout << "Server started\n";
    }
    void cServer::Run()
    {
        // Create and launch a listening port
        myListener = std::make_shared<cListener>(
                         myIOC,
                         tcp::endpoint
        {
            boost::asio::ip::make_address("0.0.0.0"),
            myPort
        } );
        myListener->Set( this );
        myListener->run();

        // start event manager
        myIOC.run();
    }

    void cServer::Join()
    {
        myThread.join();
    }

    void cServer::Set( cSession * session )
    {
        std::cout << "New connection from client\n";

        mySession = session;

        mySession->do_read();
    }

    void cServer::SessionClosed()
    {
        std::cout << "Client connection lost\n";

        mySession = 0;

        // listen for a reconnection
        myListener->do_accept();
    }

    void cServer::ClientMsg( const std::string& msg )
    {
    }

    void cServer::SendToClient(
        const std::string& msg,
        bool store )
    {
        //std::cout << "SendToClient: " << msg << "\n";
        if( ! mySession )
        {

        }
        else
        {
            mySession->Write( msg );
        }
    }

    cMessenger::cMessenger(
        boost::asio::io_context& ioc,
        cServer& server,
        int id  )
        : myTimer( ioc )
        , myIOC( ioc )
        , myServer( server )
        , myID( id )
    {
        //std::cout << "Messenger starting ...";
        myThread = std::thread(
                       &cMessenger::Run,
                       this );
    }
    void cMessenger::Run()
    {
        std::cout << "Messenger is running in its own thread\n";
        Schedule();
        myIOC.run();
    }
    void cMessenger::Schedule()
    {
        myTimer.expires_after( std::chrono::seconds(3) );
        myTimer.async_wait(std::bind(&cMessenger::onTimer, this ));
    }

    void cMessenger::onTimer()
    {
        //std::cout << " cMessenger::onTimer\n";

        myServer.SendToClient(
            "Hello World from thread " + std::to_string( myID ),
            false );

        Schedule();
    }


    int main(int argc, char* argv[])
    {
        boost::asio::io_context ioc( 3 );

        cServer Server(
            ioc,
            "8080"
        );

        cMessenger Messenger1(
            ioc,
            Server,
            1 );

        cMessenger Messenger2(
            ioc,
            Server,
            2 );

        Server.Join();

    }


ravenspoint
  • 19,093
  • 6
  • 57
  • 103
  • 2
    Not related with synchronization issue, but you have a bug in `cSession::Write` method. What is the lifetime of `msg`, when `async_write` returns immediately (so `Write` returns immediately as well) and `buffer` doesn't make a copy of msg? – rafix07 Jul 13 '19 at 13:43
  • @rafix Good point. I am aware of this issue and will fix in production code. ( For now, the test program runs fine despite this and seves to test that the server is thread safe - it isn't using strand, but works fine with mutex. ) – ravenspoint Jul 13 '19 at 13:53
  • Nothing can be tested when the program has UB. The last thing you can draw conclusions about is "thread safety" (that's notoriously hard to test empirically anyways). – sehe Jul 14 '19 at 12:05

2 Answers2

4

onTimer runs on a separate thread, and invokes SendToClient (without synchronization):

void cMessenger::onTimer() {
    // std::cout << " cMessenger::onTimer\n";

    myServer.SendToClient("Hello World from thread " + std::to_string(myID), false);

SendToClient just invokes Write, still no synchronization:

void cServer::SendToClient(const std::string &msg, bool store) {
    // std::cout << "SendToClient: " << msg << "\n";
    if (!mySession) {

    } else {
        mySession->Write(msg);
    }
}

And Write literally just calls async_write without synchronization:

void cSession::Write(const std::string &msg) {
    ws_.async_write(boost::asio::buffer(msg, msg.length()),
                    boost::asio::bind_executor(strand_, std::bind(&cSession::on_write, shared_from_this(),
                                                                  std::placeholders::_1, std::placeholders::_2)));
}

Key

The protection of a strand ONLY applies to whatever is executed on it.

You NEVER explicitly execute any operation on a strand, instead only wrap completion handlers on it.

That means that the other (non-callback) operations you initiate un-synchronized can still run simultaneously.

One fix would be to post(strand_, ...) the other operations that you forgot to synchronize. Another would be to carefully execute them from already-synchronized context only.

sehe
  • 374,641
  • 47
  • 450
  • 633
  • I do not understand. I thought boost::asio::bind_executor(strand_,... was supposed to enforce synchronization ( " Prevents a new write starting on the socket until previous write completes" ) You are saying it does not? What is then the point of doing the call to bind_executor? See example https://www.boost.org/doc/libs/1_70_0/doc/html/boost_asio/tutorial/tuttimer5.html – ravenspoint Jul 13 '19 at 03:23
  • @ravenspoint, you need to master the basis of asio, refer to the doc as described [here](https://stackoverflow.com/a/26753331/888576) – Jean Davy Jul 13 '19 at 11:55
  • @ravenspoint Indeed it doesn't. Unless you invoke all these calls FROM the strand, because that's what the documentation is likely talking about. – sehe Jul 13 '19 at 13:11
  • From the other end: that `async_write` call in `Write` needs to happen on the same ([logical¹](https://stackoverflow.com/questions/12794107/why-do-i-need-strand-per-connection-when-using-boostasio/12801042#12801042)) strand for it to provide any kind of protection. Consider `post(strand, [self=shared_from_this(),this] { /* do the stuff that needs synchronization */; });` for inspiration. – sehe Jul 13 '19 at 13:15
  • @sehe You keep recommending post(strand,... while the boost tutorials and examples use bind_executor(strand, ... Anyway, I have decided to abandon strand and go back to using an old fashioned mutex. I lock it just before the async_write and unlock it at the end of the write completion handler. Not complicated at all. The test program has now been running perfectly for 10 mins. – ravenspoint Jul 13 '19 at 13:49
  • FYI ported mutex technique back to production code. Works well. – ravenspoint Jul 13 '19 at 14:11
  • Beware though. It's very easy to do the wrong thing with mutex in async code, as the actual operation will not happen under the mutex. In this case, superficial inspection would tell you that you're safe: the thing you're synchronizing access to is the `socket` object and the mutex protects that adequately. But... – sehe Jul 13 '19 at 14:33
  • ... I think your application might run into interleved/reorderd data (see https://stackoverflow.com/a/39123990/85371). This is why composed operations usually chain into implicit strands or use explicit ones - guaranteeing the order. – sehe Jul 13 '19 at 14:36
  • So, you mutex solution is one step up (because you no longer have straight-up undefined behaviour) but you may still see intermittent data ordering issues. – sehe Jul 13 '19 at 14:36
  • Also, re: "you keep recommending post(strand...) [...] vs. bind_executor" - they're _different tools_. One explicitly posts to the strand's executor, the other _wraps any handler to always dispatch on that strand_. The latter is required if you need control over how callbacks are invoked, `post` is the natural approach when you already have the initiative. Of course, you can be roundabout and to `post(_io_service, bind_executor(_strand, [] {/*the things*/}))`, but why would one. – sehe Jul 13 '19 at 14:42
  • (Whenever I get exasperated I try to sit back, think, ask. It's okay for things to take time: the goal is proficiency through understanding, not quick satisfaction IMO). – sehe Jul 13 '19 at 14:42
  • 1
    "application might run into interleved/reorderd data" I do not care about the message order, they are all independent. Do not want messages mixed together! Since only one async_write .... on_write can happen at a time, I do not think interleaving can occur. The mutex version is much simpler! – ravenspoint Jul 13 '19 at 14:57
  • In that case, the mutex is indeed good enough. You could also just post everything to the main service as it is already threadsafe. It seems that you can simplify lots more knowing that the messages are stateless (or rather, the protocol is) – sehe Jul 13 '19 at 15:12
  • 1
    " You could also just post everything to the main service..." Yes, I know. That is how I have always done these things before. However, my client insists on using multithreading and I cannot convince him that it is not necessary. People love multithreading and want to use it everywhere as if it was magic ( when it just makes for unnecessary complexity ). Not sure how he will react to dropping strands, but fingers crossed. – ravenspoint Jul 13 '19 at 15:22
0

strand.dispatch() the I/O calls using lambdas, like

// Dispatch the send handler
auto self(this->shared_from_this());
auto send_handler = [this, self]()
{
    // Try to send the main buffer
    TrySend();
};
_strand.dispatch(send_handler);




void TrySend()
{
///...        _socket.async_write_some(asio::buffer(_send_buffer_flush.data() + _send_buffer_flush_offset, _send_buffer_flush.size() - _send_buffer_flush_offset), bind_executor(_strand, async_write_handler));
     }
David Brossard
  • 13,584
  • 6
  • 55
  • 88