1

I am trying to send a struct over TCP socket. The code below is crashing in netcom_server() function while constructing boost::archive::text_iarchive object with error below. Please see, if you could find any other flaws as well. I am running two different instances of this code - one that runs the netcom_server() function and the other that runs netcom_client().

Connection Accepted :
Success, bytes receieved 32
terminate called after throwing an instance of 'boost::archive::archive_exception'
what(): input stream error

Aborted (core dumped)

typedef struct elem{ 
        bool bit;
        int  count;
        uint64_t  hashSum[2];
        uint64_t  idSum;
    
        template <typename Archive>
        void serialize(Archive &ar, const unsigned int version){
                ar & bit;
                ar & count;
                ar & hashSum[0];ar & hashSum[1];
                ar & idSum;
        }
    
}element;
typedef boost::error_info<struct tag_errmsg, std::string> errmsg_info;
void netcom_server(int portn, bloom_filter& bloom_db){
    io_service io_service;
    element recv_elem;
    std::string bufdata;
    boost::asio::mutable_buffer rcv_buf;
    boost::system::error_code ec;
    bloom_filter recvd_bloomdb(4,"rcvbloom_db",4096);
    unsigned int i;
    ip::tcp::socket server_socket(io_service);
    tcp::acceptor acceptor_server(io_service,tcp::endpoint(tcp::v4(), portn));
    acceptor_server.accept(server_socket);
    cout<<"Connection Accepted : "<<endl;

    for(i=0; i<recvd_bloomdb.size(); ++i){

        bufdata.resize(sizeof(element));
        rcv_buf  = boost::asio::buffer(bufdata,sizeof(element));
        getData(server_socket,rcv_buf);
        try{   
            std::istringstream is(bufdata);
            boost::archive::text_iarchive in_archive {is};
            in_archive >> recv_elem;
        }
        catch(std::exception &e){
            std::cout << e.what();
        }
        recvd_bloomdb.indexins_elem(recv_elem,i);
    }

    cout<<"Received "<<i<<" elements at server"<<endl<<std::flush;
    recvd_bloomdb.show_elemvec();
    while(1) sleep(5);
 }

void netcom_client(int portn, string serverip, bloom_filter& bloom_db){
    io_service io_service;
    ip::tcp::socket client_socket(io_service);
    boost::asio::const_buffer snd_buf;
    boost::system::error_code ec;
    element snd_elem;
    string bufdata;
    std::stringstream os;
    boost::archive::text_oarchive out_archive {os};
    unsigned int i;

    client_socket.connect(tcp::endpoint(address::from_string(serverip),portn));

    for(i=0; i< bloom_db.size(); ++i){

        ::set_elemvec(snd_elem,bloom_db.getelem(i));
        out_archive << snd_elem;
        bufdata = os.str();
        snd_buf = boost::asio::buffer(bufdata, sizeof(element));
        bloom_db.show_elem(i);
        sendData(client_socket,snd_buf);
    }
    cout<<"Sent "<<i<<" elements from client"<<endl<<std::flush;

}

void sendData(tcp::socket& socket, boost::asio::const_buffer& data)
{
    boost::system::error_code ec;
    std::size_t bytes_transferred =boost::asio::write(socket,data,ec);
    if (bytes_transferred == 0 && ec == boost::asio::error::would_block)
    {
      std::cout << " Could not send any more" << std::endl;
    }
    std::cout << ec.message() << ", bytes sent "<<bytes_transferred<<std::endl;


}

void getData(tcp::socket& socket, boost::asio::mutable_buffer& data)
{
    boost::system::error_code ec;
    std::size_t bytes_transferred = boost::asio::read(socket, data,ec);
    if (bytes_transferred == 0 && ec == boost::asio::error::would_block)
    {
      std::cout << "No data available" << std::endl;
    }
    std::cout << ec.message() << ", bytes receieved "<<bytes_transferred<<std::endl;
    
}

Adding streambuf data from gdb

(gdb) p is._M_streambuf
$4 = (std::basic_streambuf<char, std::char_traits<char> > *) 0x7ffd1e567050
(gdb) p *is._M_streambuf
$5 = {
  _vptr.basic_streambuf = 0x7f3774a37618 <vtable for std::__cxx11::basic_stringbuf<char, std::char_traits<char>, std::allocator<char> >+16>, 
  _M_in_beg = 0x557e228c4600 "22 serialization::archive 17 0 0", 
  _M_in_cur = 0x557e228c4620 "", 
  _M_in_end = 0x557e228c4620 "", 
  _M_out_beg = 0x0, 
  _M_out_cur = 0x0, 
  _M_out_end = 0x0, 
  _M_buf_locale = {
    static none = 0, 
    static ctype = 1, 
    static numeric = 2, 
    static collate = 4, 
    static time = 8, 
    static monetary = 16, 
    static messages = 32, 
    static all = 63, 
    _M_impl = 0x557e228c6c40, 
    static _S_classic = 0x7f3774a3ec20 <(anonymous namespace)::c_locale_impl>, 
    static _S_global = 0x7f3774a3ec20 <(anonymous namespace)::c_locale_impl>, 
    static _S_categories = 0x7f3774a30760 <__gnu_cxx::category_names>, 
    static _S_once = 2, 
    static _S_twinned_facets = 0x7f3774a31fe0 <std::locale::_S_twinned_facets>
  }
}
Community
  • 1
  • 1
ultimate cause
  • 2,264
  • 4
  • 27
  • 44
  • 1
    So did you try catching the exception and printing it's message? – sehe Dec 16 '19 at 17:06
  • With boost::exception I did a bad job, and it was rather crashing in exception handling code itself. Then I switched to std::exception and it did not crash then. Rather it just printed out what was already there - "input stream error" – ultimate cause Dec 17 '19 at 00:18
  • Darn somehow I missed that the exception message _was already there_. Sorry about that – sehe Dec 17 '19 at 11:01
  • @sehe np, I was into thinking if boost::exception could give even more detailed info. This one is troubling for sometime now and I am looking for some expert eyes. With C++ generic programming, it is very difficult to visualize code and find out the problem yourself unless one is already expert in that area. – ultimate cause Dec 17 '19 at 11:12
  • I don't get what the relation in this line is? `boost::asio::buffer(bufdata, sizeof(element));` , like how is `bufdata` related to `sizeof(element)`? They seem completely unrelated, maybe `bufdata.size()` would be a better fit – PeterT Dec 17 '19 at 11:35
  • @PeterT I wanted the the length of buffer to be of that size, because I reading that struct from socket. Nevertheless when I made it just *boost::asio::buffer(bufdata)* it still failed the same way. – ultimate cause Dec 17 '19 at 11:38

1 Answers1

2

Several issues.

  1. your buffer isn't appropriately sized. Like the commenter said, the sizeof(element) has nothing to do with serialized archive format (which is gonna be much longer, see e.g.

  2. Changing it to just buffer(bufdata) doesn't work because it would just be a zero-length buffer (std::string default constructs to an empty string). Consider resizing to appropriate length or using boost::asio::dynamic_buffer (if your Boost version is recent enough)

  3. The lifetimes of the string stream and out_archive were too wide, causing the stream to be potentially incomplete. Make sure the stream is flushed and the archive completed before using the result:

    std::string bufdata;
    {
        std::stringstream os;
        {
            boost::archive::text_oarchive out_archive{ os };
            out_archive << snd_elem;
        }
        bufdata = os.str();
    }
    
  4. The line

    recvd_bloomdb.indexins_elem(recv_elem, i);
    

    looks suspicious: it looks like you're modifying recvd_bloomdb while iterating it (this might result in infinite loop/UB depending on the way indexins_elem is supposed to work)

  5. In fact the various copies of bloom_filter don't seem clear (the bloom_db argument isn't even used in the server code). If you wanted to serialize the entire DB, why not make bloom_filter itself serializable?

  6. There's a lack of message framing in case you want to send multiple archives on the same connection. You could decided on a delimiter. For text archives, the NUL character is invalid, so it could be used:

    bufdata += '\x00';
    sendData(client_socket, boost::asio::buffer(bufdata));
    

    And on the receiving end:

    std::size_t bytes_transferred = boost::asio::read_until(socket, data, '\x00', ec);
    

Fixups

Here's the code with the point above addressed/fixed:

Live On Coliru

#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/asio.hpp>
#include <boost/serialization/serialization.hpp>
#include <iostream>
#include <mutex>

namespace {
    // very sumple hack to synchronize output to std::cout
    #define PRINT(e) { std::lock_guard<std::mutex> lk(s_mx); std::cout << e << std::endl; }
    static std::mutex s_mx;
}

namespace ba = boost::asio;
using ba::ip::tcp;

using string_buffer = boost::asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char> >;

void sendData(tcp::socket& socket, boost::asio::const_buffer data);
void getData(tcp::socket& socket, string_buffer data);

typedef struct elem {
    bool bit            = false;
    int count           = 0;
    uint64_t hashSum[2] = {};
    uint64_t idSum      = 0;

    template <typename Archive> void serialize(Archive& ar, unsigned) {
        ar& bit;
        ar& count;
        ar& hashSum[0];
        ar& hashSum[1];
        ar& idSum;
    }

    friend std::ostream& operator<<(std::ostream& os, elem const& e) {
        std::ostringstream oss;
        oss << std::showbase << std::boolalpha;

        oss << "[" << e.bit << "," 
            << std::dec << e.count << ","
            << std::hex << "{" << e.hashSum[0] << "," << e.hashSum[1] << "},"
            << e.idSum << "]";

        return os << oss.str();
    }
} element;

namespace mocks {
    struct bloom_filter {
        std::vector<elem> mock_data{
            { false, 1, { 0x7e, 0xeb }, 42 },
        };

        bloom_filter(...) {}
        size_t size() const { return mock_data.size(); }
        void indexins_elem(elem const& e, size_t i) {
            assert(i < size());
            mock_data.at(i) = e;
            //Danger: this grows the db while it is being iterated
            //mock_data.insert(mock_data.begin() + i, e);
        }
        void show_elemvec() const {
            for (auto& el : mock_data) {
                PRINT(el);
            }
        }
        void show_elem(size_t i) const { PRINT(mock_data.at(i)); }
        elem getelem(size_t i) { return mock_data.at(i); }
    };
    void set_elemvec(element& lhs, element const& rhs) { lhs = rhs; }
}

using mocks::bloom_filter;
using mocks::set_elemvec;

typedef boost::error_info<struct tag_errmsg, std::string> errmsg_info;

void netcom_server(unsigned short portn, bloom_filter& /*bloom_db*/) {
    ba::io_service io_service;
    bloom_filter recvd_bloomdb(4, "rcvbloom_db", 4096);
    tcp::socket server_socket(io_service);
    tcp::acceptor acceptor_server(io_service, {tcp::v4(), portn});

    while (1) {
        acceptor_server.accept(server_socket);
        PRINT("Connection Accepted : " << server_socket.remote_endpoint());

        for (auto i = 0u; i < recvd_bloomdb.size(); ++i) {
            std::string bufdata;
            getData(server_socket, ba::dynamic_buffer(bufdata));
            element recv_elem;
            try {
                while (bufdata.size() && ('\x00' == bufdata.back())) {
                    bufdata.pop_back();
                }
                std::istringstream is(bufdata);
                PRINT("RECV DEBUG: " << std::quoted(is.str()));
                {
                    boost::archive::text_iarchive in_archive{ is };
                    in_archive >> recv_elem;
                }
                recvd_bloomdb.indexins_elem(recv_elem, i);
            } catch (std::exception& e) {
                PRINT(e.what());
            }
        }

        PRINT("Received " << recvd_bloomdb.size() << " elements at server");
        recvd_bloomdb.show_elemvec();
        PRINT("Close connection " << server_socket.remote_endpoint());
        server_socket.close();
    }
}

std::string archive_text(elem const& e) {
    std::stringstream os;
    {
        boost::archive::text_oarchive out_archive{ os };
        out_archive << e;
    }
    return os.str();
}

void netcom_client(unsigned short portn, std::string serverip, bloom_filter& bloom_db) {
    ba::io_service io_service;
    tcp::socket client_socket(io_service);
    client_socket.connect({ba::ip::address::from_string(serverip), portn});
    std::this_thread::sleep_for(std::chrono::seconds(1));

    for (auto i = 0u; i < bloom_db.size(); ++i) {
        bloom_db.show_elem(i);

        auto bufdata = archive_text(bloom_db.getelem(i));
        PRINT("SEND DEBUG: " << std::quoted(bufdata));
        bufdata += '\x00';
        sendData(client_socket, boost::asio::buffer(bufdata));
    }
    PRINT("Exit netcom_client");
}

void sendData(tcp::socket& socket, boost::asio::const_buffer data) {
    boost::system::error_code ec;
    std::size_t bytes_transferred = boost::asio::write(socket, data, ec);
    if (bytes_transferred == 0 && ec == boost::asio::error::would_block) {
        PRINT(" Could not send any more");
    }
    PRINT(ec.message() << "bytes sent " << bytes_transferred);
}

void getData(tcp::socket& socket, string_buffer data) {
    boost::system::error_code ec;
    std::size_t bytes_transferred = boost::asio::read_until(socket, data, '\x00', ec);
    //std::size_t bytes_transferred = boost::asio::read(socket, data, ec);
    if (bytes_transferred == 0 && ec == boost::asio::error::would_block) {
        PRINT("No data available");
    }
    PRINT(ec.message() << "bytes received " << bytes_transferred);
}

int main() {
    std::thread s([] {
        bloom_filter db;
        netcom_server(6767, db);
    });
    std::thread c([] {
        for (int i = 0; i < 4; ++i) {
            std::this_thread::sleep_for(std::chrono::seconds(1));
            bloom_filter db;
            netcom_client(6767, "127.0.0.1", db);
        }
    });

    s.join();
    c.join();
}

Prints something like

Connection Accepted : 127.0.0.1:38126
[false,1,{0x7e,0xeb},0x2a]
SEND DEBUG: "22 serialization::archive 17 0 0 0 1 126 235 42
"
Successbytes sent 49
Exit netcom_client
Successbytes received 49
RECV DEBUG: "22 serialization::archive 17 0 0 0 1 126 235 42
"
Received 1 elements at server
[false,1,{0x7e,0xeb},0x2a]
Close connection 127.0.0.1:38126
Connection Accepted : 127.0.0.1:38128
[false,1,{0x7e,0xeb},0x2a]
SEND DEBUG: "22 serialization::archive 17 0 0 0 1 126 235 42
"
Successbytes sent 49
Exit netcom_client
Successbytes received 49
RECV DEBUG: "22 serialization::archive 17 0 0 0 1 126 235 42
"
Received 1 elements at server
[false,1,{0x7e,0xeb},0x2a]
Close connection 127.0.0.1:38128
Connection Accepted : 127.0.0.1:38130
[false,1,{0x7e,0xeb},0x2a]
SEND DEBUG: "22 serialization::archive 17 0 0 0 1 126 235 42
"
Successbytes sent 49
Exit netcom_client
Successbytes received 49
RECV DEBUG: "22 serialization::archive 17 0 0 0 1 126 235 42
"
Received 1 elements at server
[false,1,{0x7e,0xeb},0x2a]
Close connection 127.0.0.1:38130
Connection Accepted : 127.0.0.1:38132
[false,1,{0x7e,0xeb},0x2a]
SEND DEBUG: "22 serialization::archive 17 0 0 0 1 126 235 42
"
Successbytes sent 49
Exit netcom_client
Successbytes received 49
RECV DEBUG: "22 serialization::archive 17 0 0 0 1 126 235 42
"
Received 1 elements at server
[false,1,{0x7e,0xeb},0x2a]
Close connection 127.0.0.1:38132

BONUS

Serializing the bloom_filter type could be easy. Also, notice that as long as elem is a POD type it is bitwise serializable, so maybe take advantage of that:

This version:

  • shows how to switch to binary archives with bitwise vector serialization
  • shows how to serialize the entire db at once
  • shows how to suppress the archive header
  • moves the framing into getData and sendData, making the delimiter dependt on the archive type (just NUL is valid in a binary stream)
  • much simplifies the server/client code in the process

Notes:

  • binary archives are NOT portable (you can only expect to share between same architecture and versions)
  • the framing currently does NOT keep any remaining data that was read with read_until. This is something that could happen in TCP in general, but not given the code shown. Just something to be aware of

Live On Coliru

#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/asio.hpp>
#include <boost/serialization/serialization.hpp>
#include <boost/serialization/binary_object.hpp>
#include <boost/serialization/vector.hpp>
#include <iostream>
#include <iomanip>
#include <mutex>

namespace {
    // very sumple hack to synchronize output to std::cout
    #define PRINT(e) { std::lock_guard<std::mutex> lk(s_mx); std::cout << e << std::endl; }
    static std::mutex s_mx;
}

namespace ba = boost::asio;
using ba::ip::tcp;

void sendData(tcp::socket& socket, std::string const& data);
std::string getData(tcp::socket& socket);

struct element {
    bool bit            = false;
    int count           = 0;
    uint64_t hashSum[2] = {};
    uint64_t idSum      = 0;

    friend std::ostream& operator<<(std::ostream& os, element const& e) {
        std::ostringstream oss;
        oss << std::showbase << std::boolalpha;

        oss << "[" << e.bit << "," 
            << std::dec << e.count << ","
            << std::hex << "{" << e.hashSum[0] << "," << e.hashSum[1] << "},"
            << e.idSum << "]";

        return os << oss.str();
    }
};

BOOST_IS_BITWISE_SERIALIZABLE(element)

namespace mocks {
    struct bloom_filter {
        bloom_filter(...) {}
        size_t size() const { return mock_data.size(); }
        void indexins_elem(element const& e, size_t i) {
            mock_data.at(i) = e;
            //// Danger: this grows the db while it is being iterated
            //assert(i <= size());
            //mock_data.insert(mock_data.begin() + i, e);
        }
        void show_elemvec() const {
            for (auto& el : mock_data) PRINT(el);
        }
        void show_elem(size_t i) const { PRINT(mock_data.at(i)); }
        element getelem(size_t i) { return mock_data.at(i); }

      private:
        std::vector<element> mock_data{
            { false, 1, { 0x7e, 0xeb }, 42 },
        };

        friend class boost::serialization::access;
        template <typename Archive> void serialize(Archive& ar, unsigned) {
            ar& mock_data;
        }
    };
}

#define USE_TEXT
#ifdef USE_TEXT
    #define ARCHIVE_ text_
    static const std::string DELIMITER("\x00", 1);
    struct safe_print {
        std::string const& data;
        friend std::ostream& operator<<(std::ostream& os, safe_print wrap) {
            return os << std::quoted(wrap.data);
        }
    };

    namespace boost { namespace serialization {
        template <typename Ar>
        void serialize(Ar& ar, element& e, unsigned) {
            ar & e.bit & e.count & e.hashSum & e.idSum;
        }
    } }
#else
    #define ARCHIVE_ binary_
    static const std::string DELIMITER("\x00\xde\xad\xbe\xef", 5);
    struct safe_print {
        std::string const& data;
        friend std::ostream& operator<<(std::ostream& os, safe_print wrap) {
            std::ostringstream oss;
            oss << std::hex << std::setfill('0');
            for (uint8_t ch : wrap.data) {
                oss << std::setw(2) << static_cast<int>(ch);
            }
            return os << oss.str();
        }
    };
#endif

template <typename T>
std::string archive(T const& data) {
    std::stringstream os;
    {
        boost::archive::BOOST_PP_CAT(ARCHIVE_, oarchive) out_archive{ os, boost::archive::no_header };
        out_archive << data;
    }
    return os.str();
}

template <typename T>
void restore(std::string const& text, T& into) {
    std::istringstream is(text);
    boost::archive::BOOST_PP_CAT(ARCHIVE_, iarchive) ia{is, boost::archive::no_header };
    ia >> into;
}

using mocks::bloom_filter;

void netcom_server(unsigned short portn) {
    ba::io_service io_service;
    tcp::acceptor acceptor_server(io_service, {tcp::v4(), portn});

    while (1) {
        tcp::socket server_socket(io_service);
        acceptor_server.accept(server_socket);
        PRINT("Connection Accepted : " << server_socket.remote_endpoint());

        bloom_filter recvd_bloomdb;
        restore(getData(server_socket), recvd_bloomdb);

        PRINT("Received " << recvd_bloomdb.size() << " elements at server");
        recvd_bloomdb.show_elemvec();
    }
}

void netcom_client(unsigned short portn, std::string serverip, bloom_filter& bloom_db) {
    ba::io_service io_service;
    tcp::socket client_socket(io_service);
    client_socket.connect({ba::ip::address::from_string(serverip), portn});

    sendData(client_socket, archive(bloom_db));
    PRINT("Exit netcom_client");
}

void sendData(tcp::socket& socket, std::string const& data) {
    PRINT("SEND DEBUG: " << safe_print{data});
    std::vector<ba::const_buffers_1> frame { 
        ba::buffer(data),
        ba::buffer(DELIMITER)
    };
    std::size_t bytes_transferred = boost::asio::write(socket, frame);
    PRINT("bytes sent " << bytes_transferred);
}

std::string getData(tcp::socket& socket) {
    std::string buffer;
    std::size_t bytes_transferred = boost::asio::read_until(socket, ba::dynamic_buffer(buffer), DELIMITER);
    PRINT("bytes received " << bytes_transferred);
    buffer.resize(bytes_transferred);
    PRINT("RECV DEBUG: " << safe_print{buffer});
    return buffer;
}

int main() {
    PRINT("For info: sizeof(element) = " << sizeof(element));
    std::thread s([] {
        bloom_filter db;
        netcom_server(6767);
    });
    std::thread c([] {
        for (int i = 0; i < 4; ++i) {
            std::this_thread::sleep_for(std::chrono::seconds(1));
            bloom_filter db;
            netcom_client(6767, "127.0.0.1", db);
        }
    });

    s.join();
    c.join();
}

Prints

For info: sizeof(element) = 32
Connection Accepted : 127.0.0.1:38134
SEND DEBUG: "0 0 0 0 1 0 0 0 0 1 2 126 235 42
"
bytes sent 34
Exit netcom_client
bytes received 34
RECV DEBUG: "0 0 0 0 1 0 0 0 0 1 2 126 235 42
"
Received 1 elements at server
[false,1,{0x7e,0xeb},0x2a]
SEND DEBUG: "0 0 0 0 1 0 0 0 0 1 2 126 235 42
"
Connection Accepted : 127.0.0.1:38136
bytes received 34
RECV DEBUG: "0 0 0 0 1 0 0 0 0 1 2 126 235 42
"
bytes sent 34
Exit netcom_client
Received 1 elements at server
[false,1,{0x7e,0xeb},0x2a]
SEND DEBUG: "0 0 0 0 1 0 0 0 0 1 2 126 235 42
"
bytes sent 34
Exit netcom_client
Connection Accepted : 127.0.0.1:38138
bytes received 34
RECV DEBUG: "0 0 0 0 1 0 0 0 0 1 2 126 235 42
"
Received 1 elements at server
[false,1,{0x7e,0xeb},0x2a]
SEND DEBUG: "0 0 0 0 1 0 0 0 0 1 2 126 235 42
"
bytes sent 34
Exit netcom_client
Connection Accepted : 127.0.0.1:38140
bytes received 34
RECV DEBUG: "0 0 0 0 1 0 0 0 0 1 2 126 235 42
"
Received 1 elements at server
[false,1,{0x7e,0xeb},0x2a]
sehe
  • 374,641
  • 47
  • 450
  • 633
  • Thanks for this infinite effort. I am trying to implement and prove to myself a mechanism suggested in a paper about invertible bloom filters, where client sends contents of its bloom filter to server. The job of server is to identify diffs between client's data and its own. Therefore, server needs to create a temporary bloom_filter object, which is what you see as an extra entity. I got stuck in the first phase itself where client is sending its filter. I will study this solution and let you know my response. You are least bothered about it but my +infinite for this long effort. Thanks. – ultimate cause Dec 17 '19 at 17:22