-1

I am trying to implement double buffering for my network server when it sends to clients. The idea came from

boost::asio::async_write - ensure only one outstanding call

Unfortunately, as great as it looks, I am getting an error when I try to run it. AFAIK, I am not using any iterators, so I assume the async_write is when it attempts to complete, but I don't know what is really causing the problem.

The error occurs after the first async_write is posted.

The error is "vector iterator not dereferencable"

and comes from this source in vector's implementation

reference operator*() const
    {   // return designated object
 #if _ITERATOR_DEBUG_LEVEL == 2
    const auto _Mycont = static_cast<const _Myvec *>(this->_Getcont());
    if (_Mycont == 0
        || _Ptr == _Tptr()
        || _Ptr < _Mycont->_Myfirst
        || _Mycont->_Mylast <= _Ptr)
        {   // report error
        _DEBUG_ERROR("vector iterator not dereferencable");
        _SCL_SECURE_OUT_OF_RANGE;
        }

The call stack is:

msvcp140d.dll!00007ff9f9f40806()    Unknown
ConsoleApplication2.exe!std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<char> > >::operator*() Line 74  C++
ConsoleApplication2.exe!boost::asio::detail::buffer_debug_check<std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<char> > > >::operator()() Line 532  C++
ConsoleApplication2.exe!std::_Invoker_functor::_Call<boost::asio::detail::buffer_debug_check<std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<char> > > > & __ptr64>(boost::asio::detail::buffer_debug_check<std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<char> > > > & _Obj) Line 1377   C++
ConsoleApplication2.exe!std::invoke<boost::asio::detail::buffer_debug_check<std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<char> > > > & __ptr64>(boost::asio::detail::buffer_debug_check<std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<char> > > > & _Obj) Line 1445    C++
ConsoleApplication2.exe!std::_Invoke_ret<void,boost::asio::detail::buffer_debug_check<std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<char> > > > & __ptr64>(std::_Forced<void,1> __formal, boost::asio::detail::buffer_debug_check<std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<char> > > > & <_Vals_0>) Line 1462  C++
ConsoleApplication2.exe!std::_Func_impl<boost::asio::detail::buffer_debug_check<std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<char> > > >,std::allocator<int>,void>::_Do_call() Line 214  C++
ConsoleApplication2.exe!std::_Func_class<void>::operator()() Line 280   C++
ConsoleApplication2.exe!boost::asio::detail::buffer_cast_helper(const boost::asio::const_buffer & b) Line 276   C++
ConsoleApplication2.exe!boost::asio::buffer_cast<void const * __ptr64>(const boost::asio::const_buffer & b) Line 435    C++
ConsoleApplication2.exe!boost::asio::buffer(const boost::asio::const_buffer & b, unsigned __int64 max_size_in_bytes) Line 751   C++
ConsoleApplication2.exe!boost::asio::detail::consuming_buffers_iterator<boost::asio::const_buffer,std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<boost::asio::const_buffer> > > >::consuming_buffers_iterator<boost::asio::const_buffer,std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<boost::asio::const_buffer> > > >(bool at_end, const boost::asio::const_buffer & first, std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<boost::asio::const_buffer> > > begin_remainder, std::_Vector_const_iterator<std::_Vector_val<std::_Simple_types<boost::asio::const_buffer> > > end_remainder, unsigned __int64 max_size) Line 62    C++
ConsoleApplication2.exe!boost::asio::detail::consuming_buffers<boost::asio::const_buffer,std::vector<boost::asio::const_buffer,std::allocator<boost::asio::const_buffer> > >::begin() Line 210  C++
ConsoleApplication2.exe!boost::asio::detail::buffer_sequence_adapter<boost::asio::const_buffer,boost::asio::detail::consuming_buffers<boost::asio::const_buffer,std::vector<boost::asio::const_buffer,std::allocator<boost::asio::const_buffer> > > >::validate(const boost::asio::detail::consuming_buffers<boost::asio::const_buffer,std::vector<boost::asio::const_buffer,std::allocator<boost::asio::const_buffer> > > & buffer_sequence) Line 145  C++
ConsoleApplication2.exe!boost::asio::detail::win_iocp_socket_send_op<boost::asio::detail::consuming_buffers<boost::asio::const_buffer,std::vector<boost::asio::const_buffer,std::allocator<boost::asio::const_buffer> > >,boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::vector<boost::asio::const_buffer,std::allocator<boost::asio::const_buffer> >,boost::asio::detail::transfer_all_t,void <lambda>(const boost::system::error_code &, unsigned __int64) > >::do_complete(boost::asio::detail::win_iocp_io_service * owner, boost::asio::detail::win_iocp_operation * base, const boost::system::error_code & result_ec, unsigned __int64 bytes_transferred) Line 74   C++
ConsoleApplication2.exe!boost::asio::detail::win_iocp_operation::complete(boost::asio::detail::win_iocp_io_service & owner, const boost::system::error_code & ec, unsigned __int64 bytes_transferred) Line 47   C++
ConsoleApplication2.exe!boost::asio::detail::win_iocp_io_service::do_one(bool block, boost::system::error_code & ec) Line 406   C++
ConsoleApplication2.exe!boost::asio::detail::win_iocp_io_service::run(boost::system::error_code & ec) Line 164  C++
ConsoleApplication2.exe!boost::asio::io_service::run() Line 59  C++
ConsoleApplication2.exe!ConnectionManager::IoServiceThreadProc() Line 83    C++
[External Code] 

My minimal compilable example, which is a network server that just tried to send a 'beep#' on a timer is:

Connection.h

#pragma once

#include <boost/asio.hpp>

#include <atomic>
#include <condition_variable>
#include <memory>
#include <mutex>

//--------------------------------------------------------------------
class ConnectionManager;

//--------------------------------------------------------------------
class Connection : public std::enable_shared_from_this<Connection>
{
public:

    typedef std::shared_ptr<Connection> SharedPtr;

    // Ensure all instances are created as shared_ptr in order to fulfill requirements for shared_from_this
    static Connection::SharedPtr Create(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket & socket);

    Connection(const Connection &) = delete;
    Connection(Connection &&) = delete;
    Connection & operator = (const Connection &) = delete;
    Connection & operator = (Connection &&) = delete;
    ~Connection();

    // We have to defer the start until we are fully constructed because we share_from_this()
    void Start();
    void Stop();

    void Send(const std::vector<char> & data);

private:

    ConnectionManager *                                     m_owner;
    boost::asio::ip::tcp::socket                            m_socket;
    std::atomic<bool>                                       m_stopped;
    boost::asio::streambuf                                  m_receiveBuffer;
    mutable std::mutex                                      m_sendMutex;
    std::vector<boost::asio::const_buffer>                  m_sendBuffers[2];    // Double buffer
    int                                                     m_activeSendBufferIndex;
    bool                                                    m_sending;

    std::vector<char>                                       m_allReadData;

    Connection(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket socket);

    void DoReceive();
    void DoSend();
};

//--------------------------------------------------------------------

Connection.cpp

#include "Connection.h"
#include "ConnectionManager.h"
#include "Logger.h"

#include <boost/bind.hpp>

#include <algorithm>
#include <cstdio>

//--------------------------------------------------------------------
Connection::SharedPtr Connection::Create(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket & socket)
{
    return Connection::SharedPtr(new Connection(connectionManager, std::move(socket)));
}

//--------------------------------------------------------------------
Connection::Connection(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket socket)
    :
    m_owner                             (connectionManager)
  , m_socket                            (std::move(socket))
  , m_stopped                           (false)
  , m_receiveBuffer                     ()
  , m_sendMutex                         ()
  , m_sendBuffers                       ()
  , m_activeSendBufferIndex             (0)
  , m_sending                           (false)
  , m_allReadData                       ()
{
}

//--------------------------------------------------------------------
Connection::~Connection()
{
    // Boost uses RAII, so we don't have anything to do. Let thier destructors take care of business
}

//--------------------------------------------------------------------
void Connection::Start()
{
    DoReceive();
}

//--------------------------------------------------------------------
void Connection::Stop()
{
    // The entire connection class is only kept alive, because it is a shared pointer and always has a ref count
    // as a consequence of the outstanding async receive call that gets posted every time we receive.
    // Once we stop posting another receive in the receive handler and once our owner release any references to
    // us, we will get destroyed.
    m_stopped = true;
    m_owner->OnConnectionClosed(shared_from_this());
}

//--------------------------------------------------------------------
void Connection::Send(const std::vector<char> & data)
{
    std::lock_guard<std::mutex> lock(m_sendMutex);

    // Append to the inactive buffer
    m_sendBuffers[m_activeSendBufferIndex ^ 1].push_back(boost::asio::buffer(data));

    //
    DoSend();
}

//--------------------------------------------------------------------
void Connection::DoSend()
{
    // Check if there is an async send in progress
    // An empty active buffer indicates there is no outstanding send
    if (m_sendBuffers[m_activeSendBufferIndex].empty())
    {
        m_activeSendBufferIndex ^= 1;

        boost::asio::async_write(m_socket, m_sendBuffers[m_activeSendBufferIndex],
            [this](const boost::system::error_code & errorCode, size_t bytesTransferred)
            {
                std::lock_guard<std::mutex> lock(m_sendMutex);

                m_sendBuffers[m_activeSendBufferIndex].clear();

                if (errorCode)
                {
                    printf("An error occured while attemping to send data to a connection. Error Code: %d", errorCode.value());

                    // An error occurred
                    // We do not stop or close on sends, but instead let the receive error out and then close
                    return;
                }

                // Check if there is more to send that has been queued up on the inactive buffer,
                // while we were sending what was on the active buffer
                if (!m_sendBuffers[m_activeSendBufferIndex ^ 1].empty())
                {
                    DoSend();
                }
            });
    }
}

//--------------------------------------------------------------------
void Connection::DoReceive()
{
    auto self(shared_from_this());

    boost::asio::async_read_until(m_socket, m_receiveBuffer, '#',
        [self](const boost::system::error_code & errorCode, size_t bytesRead)
        {
            if (errorCode)
            {
                printf("An error occured while attemping to receive data from connection. Error Code: %d", errorCode.value());

                // Notify our masters that we are ready to be destroyed
                self->m_owner->OnConnectionClosed(self);

                // An error occured
                return;
            }

            // Grab the read data
            std::istream stream(&self->m_receiveBuffer);
            std::string data;
            std::getline(stream, data, '#');
            data += "#";

            printf("Received data from client: %s", data.c_str());

            // Issue the next receive
            if (!self->m_stopped)
            {
                self->DoReceive();
            }
        });
}

//--------------------------------------------------------------------

ConnectionManager.h

#pragma once

#include "Connection.h"

// Boost Includes
#include <boost/asio.hpp>

// Standard Includes
#include <thread>
#include <vector>

//--------------------------------------------------------------------
class ConnectionManager
{
public:

    ConnectionManager(unsigned port, size_t numThreads);
    ConnectionManager(const ConnectionManager &) = delete;
    ConnectionManager(ConnectionManager &&) = delete;
    ConnectionManager & operator = (const ConnectionManager &) = delete;
    ConnectionManager & operator = (ConnectionManager &&) = delete;
    ~ConnectionManager();

    void Start();
    void Stop();

    void OnConnectionClosed(Connection::SharedPtr connection);

protected:

    boost::asio::io_service            m_io_service;
    boost::asio::ip::tcp::acceptor     m_acceptor;
    boost::asio::ip::tcp::socket       m_listenSocket;
    std::vector<std::thread>           m_threads;

    mutable std::mutex                 m_connectionsMutex;
    std::vector<Connection::SharedPtr> m_connections;

    boost::asio::deadline_timer        m_timer;

    void IoServiceThreadProc();

    void DoAccept();
    void DoTimer();
};

//--------------------------------------------------------------------

ConnectionManager.cpp

#include "ConnectionManager.h"

#include "Logger.h"

#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>

#include<cstdio>
#include <system_error>

//------------------------------------------------------------------------------
ConnectionManager::ConnectionManager(unsigned port, size_t numThreads)
    :
    m_io_service  ()
  , m_acceptor    (m_io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port))
  , m_listenSocket(m_io_service)
  , m_threads     (numThreads)
  , m_timer       (m_io_service)
{
}

//------------------------------------------------------------------------------
ConnectionManager::~ConnectionManager()
{
    Stop();
}

//------------------------------------------------------------------------------
void ConnectionManager::Start()
{
    if (m_io_service.stopped())
    {
        m_io_service.reset();
    }

    DoAccept();

    for (auto & thread : m_threads)
    {
        if (!thread.joinable())
        {
            thread.swap(std::thread(&ConnectionManager::IoServiceThreadProc, this));
        }
    }

    DoTimer();
}

//------------------------------------------------------------------------------
void ConnectionManager::Stop()
{
    {
        std::lock_guard<std::mutex> lock(m_connectionsMutex);
        m_connections.clear();
    }

    // TODO - Will the stopping of the io_service be enough to kill all the connections and ultimately have them get destroyed?
    //        Because remember they have outstanding ref count to thier shared_ptr in the async handlers
    m_io_service.stop();

    for (auto & thread : m_threads)
    {
        if (thread.joinable())
        {
            thread.join();
        }
    }
}

//------------------------------------------------------------------------------
void ConnectionManager::IoServiceThreadProc()
{
    try
    {
        // Log that we are starting the io_service thread
        {
            printf("io_service socket thread starting.");
        }

        // Run the asynchronous callbacks from the socket on this thread
        // Until the io_service is stopped from another thread
        m_io_service.run();
    }
    catch (std::system_error & e)
    {
       printf("System error caught in io_service socket thread. Error Code: %d", e.code().value());
    }
    catch (std::exception & e)
    {
        printf("Standard exception caught in io_service socket thread. Exception: %s", e.what());
    }
    catch (...)
    {
        printf("Unhandled exception caught in io_service socket thread.");
    }

    {
        printf("io_service socket thread exiting.");
    }
}

//------------------------------------------------------------------------------
void ConnectionManager::DoAccept()
{
    m_acceptor.async_accept(m_listenSocket,
        [this](const boost::system::error_code errorCode)
        {
            if (errorCode)
            {
                printf(, "An error occured while attemping to accept connections. Error Code: %d", errorCode.value());
                return;
            }

            // Create the connection from the connected socket
            std::lock_guard<std::mutex> lock(m_connectionsMutex);
            Connection::SharedPtr connection = Connection::Create(this, m_listenSocket);
            m_connections.push_back(connection);
            connection->Start();

            DoAccept();
        });
}

//------------------------------------------------------------------------------
void ConnectionManager::OnConnectionClosed(Connection::SharedPtr connection)
{
    std::lock_guard<std::mutex> lock(m_connectionsMutex);

    auto itConnection = std::find(m_connections.begin(), m_connections.end(), connection);
    if (itConnection != m_connections.end())
    {
        m_connections.erase(itConnection);
    }
}

//------------------------------------------------------------------------------
void ConnectionManager::DoTimer()
{
    if (!m_io_service.stopped())
    {
        // Send messages every second
        m_timer.expires_from_now(boost::posix_time::seconds(30));
        m_timer.async_wait(
            [this](const boost::system::error_code & errorCode)
            {
                std::lock_guard<std::mutex> lock(m_connectionsMutex);
                for (auto connection : m_connections)
                {
                    connection->Send(std::vector<char>{'b', 'e', 'e', 'p', '#'});
                }

                DoTimer();
            });
    }
}

main.cpp

#include "ConnectionManager.h"

#include <cstring>
#include <iostream>
#include <string>

int main()
{
    ConnectionManager connectionManager(4000, 2);
    connectionManager.Start();

    std::this_thread::sleep_for(std::chrono::minutes(5));

    connectionManager.Stop();

    return 0;
}

I've got a vector of buffers and there are two of them, one is active and once is inactive. The inactive is appended to while there is an outstanding async write posted. Then the handler for the async write clears the active buffer, which should have been sent. It all looks OK to me. I've been looking at it all day yesterday.

Does Anyone else have any idea what I did wrong? I really am quite clueless on how to use these buffers properly.

Christopher Pisz
  • 3,757
  • 4
  • 29
  • 65

1 Answers1

1

Problem may be in this line

connection->Send(std::vector<char>{'b', 'e', 'e', 'p', '#'});

you create temporary object of vector which is passed to Send method by const reference, but in your Send method you are using boost::asio::buffer(data) but buffer creates object which contains only pointer to data and size of data. After calling Send method temporary vector is deleted, and your buffers are invalid when you are sending data in DoSend method using async_write.

rafix07
  • 20,001
  • 3
  • 20
  • 33
  • Well this would imply that the vector containing "beep" _is_ the buffer and that I would have to ensure that it was untouched until the handler is called. What then is the point of the std::vector buffers if they do not contain a copy of the data? I thought that was the entire point of implementing the double buffer with them. What then would my code look like? Now I am even more confused, although excited to solve this problem. – Christopher Pisz Dec 22 '17 at 16:51
  • I only implied where the problem may be. I don't have complete solution of your problem. You make copy of vector and then send it, then you are sure that data exists, or send data in synchronous way. – rafix07 Dec 22 '17 at 16:58
  • Boost Asio's buffer concept _never_ copies data. It's in the very [definition](http://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/overview/core/buffers.html). This is because C++ tries to be pay-for-what-you-need (if you need the copy, you can manage it yourself). – sehe Dec 22 '17 at 21:19