Several issues.
your buffer isn't appropriately sized. Like the commenter said, the sizeof(element)
has nothing to do with serialized archive format (which is gonna be much longer, see e.g.
Changing it to just buffer(bufdata)
doesn't work because it would just be a zero-length buffer (std::string
default constructs to an empty string). Consider resizing to appropriate length or using boost::asio::dynamic_buffer
(if your Boost version is recent enough)
The lifetimes of the string stream and out_archive were too wide, causing the stream to be potentially incomplete. Make sure the stream is flushed and the archive completed before using the result:
std::string bufdata;
{
std::stringstream os;
{
boost::archive::text_oarchive out_archive{ os };
out_archive << snd_elem;
}
bufdata = os.str();
}
The line
recvd_bloomdb.indexins_elem(recv_elem, i);
looks suspicious: it looks like you're modifying recvd_bloomdb
while iterating it (this might result in infinite loop/UB depending on the way indexins_elem
is supposed to work)
In fact the various copies of bloom_filter
don't seem clear (the bloom_db
argument isn't even used in the server code). If you wanted to serialize the entire DB, why not make bloom_filter
itself serializable?
There's a lack of message framing in case you want to send multiple archives on the same connection. You could decided on a delimiter. For text archives, the NUL character is invalid, so it could be used:
bufdata += '\x00';
sendData(client_socket, boost::asio::buffer(bufdata));
And on the receiving end:
std::size_t bytes_transferred = boost::asio::read_until(socket, data, '\x00', ec);
Fixups
Here's the code with the point above addressed/fixed:
Live On Coliru
#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/asio.hpp>
#include <boost/serialization/serialization.hpp>
#include <iostream>
#include <mutex>
namespace {
// very sumple hack to synchronize output to std::cout
#define PRINT(e) { std::lock_guard<std::mutex> lk(s_mx); std::cout << e << std::endl; }
static std::mutex s_mx;
}
namespace ba = boost::asio;
using ba::ip::tcp;
using string_buffer = boost::asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char> >;
void sendData(tcp::socket& socket, boost::asio::const_buffer data);
void getData(tcp::socket& socket, string_buffer data);
typedef struct elem {
bool bit = false;
int count = 0;
uint64_t hashSum[2] = {};
uint64_t idSum = 0;
template <typename Archive> void serialize(Archive& ar, unsigned) {
ar& bit;
ar& count;
ar& hashSum[0];
ar& hashSum[1];
ar& idSum;
}
friend std::ostream& operator<<(std::ostream& os, elem const& e) {
std::ostringstream oss;
oss << std::showbase << std::boolalpha;
oss << "[" << e.bit << ","
<< std::dec << e.count << ","
<< std::hex << "{" << e.hashSum[0] << "," << e.hashSum[1] << "},"
<< e.idSum << "]";
return os << oss.str();
}
} element;
namespace mocks {
struct bloom_filter {
std::vector<elem> mock_data{
{ false, 1, { 0x7e, 0xeb }, 42 },
};
bloom_filter(...) {}
size_t size() const { return mock_data.size(); }
void indexins_elem(elem const& e, size_t i) {
assert(i < size());
mock_data.at(i) = e;
//Danger: this grows the db while it is being iterated
//mock_data.insert(mock_data.begin() + i, e);
}
void show_elemvec() const {
for (auto& el : mock_data) {
PRINT(el);
}
}
void show_elem(size_t i) const { PRINT(mock_data.at(i)); }
elem getelem(size_t i) { return mock_data.at(i); }
};
void set_elemvec(element& lhs, element const& rhs) { lhs = rhs; }
}
using mocks::bloom_filter;
using mocks::set_elemvec;
typedef boost::error_info<struct tag_errmsg, std::string> errmsg_info;
void netcom_server(unsigned short portn, bloom_filter& /*bloom_db*/) {
ba::io_service io_service;
bloom_filter recvd_bloomdb(4, "rcvbloom_db", 4096);
tcp::socket server_socket(io_service);
tcp::acceptor acceptor_server(io_service, {tcp::v4(), portn});
while (1) {
acceptor_server.accept(server_socket);
PRINT("Connection Accepted : " << server_socket.remote_endpoint());
for (auto i = 0u; i < recvd_bloomdb.size(); ++i) {
std::string bufdata;
getData(server_socket, ba::dynamic_buffer(bufdata));
element recv_elem;
try {
while (bufdata.size() && ('\x00' == bufdata.back())) {
bufdata.pop_back();
}
std::istringstream is(bufdata);
PRINT("RECV DEBUG: " << std::quoted(is.str()));
{
boost::archive::text_iarchive in_archive{ is };
in_archive >> recv_elem;
}
recvd_bloomdb.indexins_elem(recv_elem, i);
} catch (std::exception& e) {
PRINT(e.what());
}
}
PRINT("Received " << recvd_bloomdb.size() << " elements at server");
recvd_bloomdb.show_elemvec();
PRINT("Close connection " << server_socket.remote_endpoint());
server_socket.close();
}
}
std::string archive_text(elem const& e) {
std::stringstream os;
{
boost::archive::text_oarchive out_archive{ os };
out_archive << e;
}
return os.str();
}
void netcom_client(unsigned short portn, std::string serverip, bloom_filter& bloom_db) {
ba::io_service io_service;
tcp::socket client_socket(io_service);
client_socket.connect({ba::ip::address::from_string(serverip), portn});
std::this_thread::sleep_for(std::chrono::seconds(1));
for (auto i = 0u; i < bloom_db.size(); ++i) {
bloom_db.show_elem(i);
auto bufdata = archive_text(bloom_db.getelem(i));
PRINT("SEND DEBUG: " << std::quoted(bufdata));
bufdata += '\x00';
sendData(client_socket, boost::asio::buffer(bufdata));
}
PRINT("Exit netcom_client");
}
void sendData(tcp::socket& socket, boost::asio::const_buffer data) {
boost::system::error_code ec;
std::size_t bytes_transferred = boost::asio::write(socket, data, ec);
if (bytes_transferred == 0 && ec == boost::asio::error::would_block) {
PRINT(" Could not send any more");
}
PRINT(ec.message() << "bytes sent " << bytes_transferred);
}
void getData(tcp::socket& socket, string_buffer data) {
boost::system::error_code ec;
std::size_t bytes_transferred = boost::asio::read_until(socket, data, '\x00', ec);
//std::size_t bytes_transferred = boost::asio::read(socket, data, ec);
if (bytes_transferred == 0 && ec == boost::asio::error::would_block) {
PRINT("No data available");
}
PRINT(ec.message() << "bytes received " << bytes_transferred);
}
int main() {
std::thread s([] {
bloom_filter db;
netcom_server(6767, db);
});
std::thread c([] {
for (int i = 0; i < 4; ++i) {
std::this_thread::sleep_for(std::chrono::seconds(1));
bloom_filter db;
netcom_client(6767, "127.0.0.1", db);
}
});
s.join();
c.join();
}
Prints something like
Connection Accepted : 127.0.0.1:38126
[false,1,{0x7e,0xeb},0x2a]
SEND DEBUG: "22 serialization::archive 17 0 0 0 1 126 235 42
"
Successbytes sent 49
Exit netcom_client
Successbytes received 49
RECV DEBUG: "22 serialization::archive 17 0 0 0 1 126 235 42
"
Received 1 elements at server
[false,1,{0x7e,0xeb},0x2a]
Close connection 127.0.0.1:38126
Connection Accepted : 127.0.0.1:38128
[false,1,{0x7e,0xeb},0x2a]
SEND DEBUG: "22 serialization::archive 17 0 0 0 1 126 235 42
"
Successbytes sent 49
Exit netcom_client
Successbytes received 49
RECV DEBUG: "22 serialization::archive 17 0 0 0 1 126 235 42
"
Received 1 elements at server
[false,1,{0x7e,0xeb},0x2a]
Close connection 127.0.0.1:38128
Connection Accepted : 127.0.0.1:38130
[false,1,{0x7e,0xeb},0x2a]
SEND DEBUG: "22 serialization::archive 17 0 0 0 1 126 235 42
"
Successbytes sent 49
Exit netcom_client
Successbytes received 49
RECV DEBUG: "22 serialization::archive 17 0 0 0 1 126 235 42
"
Received 1 elements at server
[false,1,{0x7e,0xeb},0x2a]
Close connection 127.0.0.1:38130
Connection Accepted : 127.0.0.1:38132
[false,1,{0x7e,0xeb},0x2a]
SEND DEBUG: "22 serialization::archive 17 0 0 0 1 126 235 42
"
Successbytes sent 49
Exit netcom_client
Successbytes received 49
RECV DEBUG: "22 serialization::archive 17 0 0 0 1 126 235 42
"
Received 1 elements at server
[false,1,{0x7e,0xeb},0x2a]
Close connection 127.0.0.1:38132
BONUS
Serializing the bloom_filter
type could be easy. Also, notice that as long as elem
is a POD type it is bitwise serializable, so maybe take advantage of that:
This version:
- shows how to switch to binary archives with bitwise vector serialization
- shows how to serialize the entire db at once
- shows how to suppress the archive header
- moves the framing into
getData
and sendData
, making the delimiter dependt on the archive type (just NUL
is valid in a binary stream)
- much simplifies the server/client code in the process
Notes:
- binary archives are NOT portable (you can only expect to share between same architecture and versions)
- the framing currently does NOT keep any remaining data that was read with
read_until
. This is something that could happen in TCP in general, but not given the code shown. Just something to be aware of
Live On Coliru
#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/asio.hpp>
#include <boost/serialization/serialization.hpp>
#include <boost/serialization/binary_object.hpp>
#include <boost/serialization/vector.hpp>
#include <iostream>
#include <iomanip>
#include <mutex>
namespace {
// very sumple hack to synchronize output to std::cout
#define PRINT(e) { std::lock_guard<std::mutex> lk(s_mx); std::cout << e << std::endl; }
static std::mutex s_mx;
}
namespace ba = boost::asio;
using ba::ip::tcp;
void sendData(tcp::socket& socket, std::string const& data);
std::string getData(tcp::socket& socket);
struct element {
bool bit = false;
int count = 0;
uint64_t hashSum[2] = {};
uint64_t idSum = 0;
friend std::ostream& operator<<(std::ostream& os, element const& e) {
std::ostringstream oss;
oss << std::showbase << std::boolalpha;
oss << "[" << e.bit << ","
<< std::dec << e.count << ","
<< std::hex << "{" << e.hashSum[0] << "," << e.hashSum[1] << "},"
<< e.idSum << "]";
return os << oss.str();
}
};
BOOST_IS_BITWISE_SERIALIZABLE(element)
namespace mocks {
struct bloom_filter {
bloom_filter(...) {}
size_t size() const { return mock_data.size(); }
void indexins_elem(element const& e, size_t i) {
mock_data.at(i) = e;
//// Danger: this grows the db while it is being iterated
//assert(i <= size());
//mock_data.insert(mock_data.begin() + i, e);
}
void show_elemvec() const {
for (auto& el : mock_data) PRINT(el);
}
void show_elem(size_t i) const { PRINT(mock_data.at(i)); }
element getelem(size_t i) { return mock_data.at(i); }
private:
std::vector<element> mock_data{
{ false, 1, { 0x7e, 0xeb }, 42 },
};
friend class boost::serialization::access;
template <typename Archive> void serialize(Archive& ar, unsigned) {
ar& mock_data;
}
};
}
#define USE_TEXT
#ifdef USE_TEXT
#define ARCHIVE_ text_
static const std::string DELIMITER("\x00", 1);
struct safe_print {
std::string const& data;
friend std::ostream& operator<<(std::ostream& os, safe_print wrap) {
return os << std::quoted(wrap.data);
}
};
namespace boost { namespace serialization {
template <typename Ar>
void serialize(Ar& ar, element& e, unsigned) {
ar & e.bit & e.count & e.hashSum & e.idSum;
}
} }
#else
#define ARCHIVE_ binary_
static const std::string DELIMITER("\x00\xde\xad\xbe\xef", 5);
struct safe_print {
std::string const& data;
friend std::ostream& operator<<(std::ostream& os, safe_print wrap) {
std::ostringstream oss;
oss << std::hex << std::setfill('0');
for (uint8_t ch : wrap.data) {
oss << std::setw(2) << static_cast<int>(ch);
}
return os << oss.str();
}
};
#endif
template <typename T>
std::string archive(T const& data) {
std::stringstream os;
{
boost::archive::BOOST_PP_CAT(ARCHIVE_, oarchive) out_archive{ os, boost::archive::no_header };
out_archive << data;
}
return os.str();
}
template <typename T>
void restore(std::string const& text, T& into) {
std::istringstream is(text);
boost::archive::BOOST_PP_CAT(ARCHIVE_, iarchive) ia{is, boost::archive::no_header };
ia >> into;
}
using mocks::bloom_filter;
void netcom_server(unsigned short portn) {
ba::io_service io_service;
tcp::acceptor acceptor_server(io_service, {tcp::v4(), portn});
while (1) {
tcp::socket server_socket(io_service);
acceptor_server.accept(server_socket);
PRINT("Connection Accepted : " << server_socket.remote_endpoint());
bloom_filter recvd_bloomdb;
restore(getData(server_socket), recvd_bloomdb);
PRINT("Received " << recvd_bloomdb.size() << " elements at server");
recvd_bloomdb.show_elemvec();
}
}
void netcom_client(unsigned short portn, std::string serverip, bloom_filter& bloom_db) {
ba::io_service io_service;
tcp::socket client_socket(io_service);
client_socket.connect({ba::ip::address::from_string(serverip), portn});
sendData(client_socket, archive(bloom_db));
PRINT("Exit netcom_client");
}
void sendData(tcp::socket& socket, std::string const& data) {
PRINT("SEND DEBUG: " << safe_print{data});
std::vector<ba::const_buffers_1> frame {
ba::buffer(data),
ba::buffer(DELIMITER)
};
std::size_t bytes_transferred = boost::asio::write(socket, frame);
PRINT("bytes sent " << bytes_transferred);
}
std::string getData(tcp::socket& socket) {
std::string buffer;
std::size_t bytes_transferred = boost::asio::read_until(socket, ba::dynamic_buffer(buffer), DELIMITER);
PRINT("bytes received " << bytes_transferred);
buffer.resize(bytes_transferred);
PRINT("RECV DEBUG: " << safe_print{buffer});
return buffer;
}
int main() {
PRINT("For info: sizeof(element) = " << sizeof(element));
std::thread s([] {
bloom_filter db;
netcom_server(6767);
});
std::thread c([] {
for (int i = 0; i < 4; ++i) {
std::this_thread::sleep_for(std::chrono::seconds(1));
bloom_filter db;
netcom_client(6767, "127.0.0.1", db);
}
});
s.join();
c.join();
}
Prints
For info: sizeof(element) = 32
Connection Accepted : 127.0.0.1:38134
SEND DEBUG: "0 0 0 0 1 0 0 0 0 1 2 126 235 42
"
bytes sent 34
Exit netcom_client
bytes received 34
RECV DEBUG: "0 0 0 0 1 0 0 0 0 1 2 126 235 42
"
Received 1 elements at server
[false,1,{0x7e,0xeb},0x2a]
SEND DEBUG: "0 0 0 0 1 0 0 0 0 1 2 126 235 42
"
Connection Accepted : 127.0.0.1:38136
bytes received 34
RECV DEBUG: "0 0 0 0 1 0 0 0 0 1 2 126 235 42
"
bytes sent 34
Exit netcom_client
Received 1 elements at server
[false,1,{0x7e,0xeb},0x2a]
SEND DEBUG: "0 0 0 0 1 0 0 0 0 1 2 126 235 42
"
bytes sent 34
Exit netcom_client
Connection Accepted : 127.0.0.1:38138
bytes received 34
RECV DEBUG: "0 0 0 0 1 0 0 0 0 1 2 126 235 42
"
Received 1 elements at server
[false,1,{0x7e,0xeb},0x2a]
SEND DEBUG: "0 0 0 0 1 0 0 0 0 1 2 126 235 42
"
bytes sent 34
Exit netcom_client
Connection Accepted : 127.0.0.1:38140
bytes received 34
RECV DEBUG: "0 0 0 0 1 0 0 0 0 1 2 126 235 42
"
Received 1 elements at server
[false,1,{0x7e,0xeb},0x2a]