You can add a deadline timer that cancels the IO operation. You can observe the cancellation because the completion will be called with error::operation_aborted
.
deadline_.expires_from_now(1s);
deadline_.async_wait([self, this] (error_code ec) {
if (!ec) socket_.cancel();
});
I spent about 45 minutes making the rest of your code self-contained:
- in this example I'll assume that we
- want to wait for max 5s for a new header to arrive (so after a new session was started or until the next request arrives on the same session)
- after which the fullbody must be received within 1s
Note also that we avoid closing the socket - that's done in the session's destructor. It's better to shutdown gracefully.
Live Demo
#include <boost/asio.hpp>
#include <boost/endian/arithmetic.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iomanip>
#include <iostream>
using namespace std::chrono_literals;
namespace bip = boost::interprocess;
using boost::asio::ip::tcp;
using boost::system::error_code;
using Queue = boost::interprocess::message_queue;
static constexpr auto MAX_MESG_LEN = 100;
static constexpr auto MAX_MESGS = 10;
struct Message {
using Len = boost::endian::big_uint32_t;
struct header_t {
Len len;
};
static const auto header_length = sizeof(header_t);
std::array<char, MAX_MESG_LEN + header_length> buf;
char const* data() const { return buf.data(); }
char* data() { return buf.data(); }
char const* body() const { return data() + header_length; }
char* body() { return data() + header_length; }
static_assert(std::is_standard_layout_v<header_t> and
std::is_trivial_v<header_t>);
Len body_length() const { return std::min(h().len, max_body_length()); }
Len max_body_length() const { return buf.max_size() - header_length; }
bool decode_header() { return h().len <= max_body_length(); }
bool set_body(std::string_view value) {
assert(value.length() <= max_body_length());
h().len = value.length();
std::copy_n(value.begin(), body_length(), body());
return (value.length() == body_length()); // not truncated
}
private:
header_t& h() { return *reinterpret_cast<header_t*>(data()); }
header_t const& h() const { return *reinterpret_cast<header_t const*>(data()); }
};
struct Session : std::enable_shared_from_this<Session> {
Session(tcp::socket&& s) : socket_(std::move(s)) {}
void start() {
post(strand_,
[ this, self = shared_from_this() ] { do_read_header(); });
}
private:
using Strand = boost::asio::strand<tcp::socket::executor_type>;
using Timer = boost::asio::steady_timer;
tcp::socket socket_{strand_};
Strand strand_{make_strand(socket_.get_executor())};
Message res;
Queue request_queue_{bip::open_or_create, "SendQueue", MAX_MESGS, MAX_MESG_LEN};
Timer recv_deadline_{strand_};
void do_read_header() {
auto self(shared_from_this());
std::cout << "do_read_header: " << res.header_length << std::endl;
recv_deadline_.expires_from_now(5s);
recv_deadline_.async_wait([ self, this ](error_code ec) {
if (!ec) {
std::cerr << "header timeout" << std::endl;
socket_.cancel();
}
});
boost::asio::async_read(
socket_, boost::asio::buffer(res.data(), res.header_length),
[ this, self ](error_code ec, size_t /*length*/) {
std::cerr << "header: " << ec.message() << std::endl;
recv_deadline_.cancel();
if (!ec && res.decode_header()) {
do_read_body();
} else {
socket_.shutdown(tcp::socket::shutdown_both);
}
});
}
void do_read_body() {
auto self(shared_from_this());
// Message msg;
std::cout << "do_read_body: " << res.body_length() << std::endl;
recv_deadline_.expires_from_now(1s);
recv_deadline_.async_wait([self, this] (error_code ec) {
if (!ec) {
std::cerr << "body timeout" << std::endl;
socket_.cancel();
}
});
boost::asio::async_read(
socket_,
boost::asio::buffer(res.body(), res.body_length()),
boost::asio::transfer_exactly(res.body_length()),
[ this, self ](error_code ec, std::size_t length) {
std::cerr << "body: " << ec.message() << std::endl;
recv_deadline_.cancel();
if (!ec) {
try {
// Not safe to print unless NUL-terminated, see e.g.
// https://stackoverflow.com/questions/66278813/boost-deadline-timer-causes-stack-buffer-overflow/66279497#66279497
if (length)
request_queue_.send(res.body(), res.body_length(), 0);
} catch (const std::exception& ex) {
std::cout << ex.what() << std::endl;
}
do_read_header();
} else {
socket_.shutdown(tcp::socket::shutdown_both);
}
});
}
};
class Server {
public:
Server(boost::asio::io_service& io_service, short port)
: acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
socket_(io_service) {
do_accept();
}
private:
void do_accept() {
acceptor_.async_accept(socket_, [ this ](error_code ec) {
std::cerr << "async_accept: " << ec.message() << std::endl;
if (!ec) {
std::cerr << "session: " << socket_.remote_endpoint() << std::endl;
std::make_shared<Session>(std::move(socket_))->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
tcp::socket socket_;
};
int main(int argc, char**) {
Queue queue{bip::open_or_create, "SendQueue", MAX_MESGS, MAX_MESG_LEN}; // ensure it exists
if (argc == 1) {
boost::asio::io_context ioc;
Server s(ioc, 8989);
ioc.run_for(10s);
} else {
while (true) {
using Buf = std::array<char, MAX_MESG_LEN>;
Buf buf;
unsigned prio;
size_t n;
queue.receive(buf.data(), buf.size(), n, prio);
std::cout << "Received: " << std::quoted(std::string_view(buf.data(), n)) << std::endl;
}
}
}
Testable with
./sotest
In another terminal:
./sotest consumer
And somewhere else e.g. some requests that don't timeout:
for msg in '0000 0000' '0000 0001 31' '0000 000c 6865 6c6c 6f20 776f 726c 640a'
do
xxd -r -p <<< "$msg" |
netcat localhost 8989 -w 1
done
Or, multi-request on single session, then session times out (-w 6
exceeds 5s):
msg='0000 0000 0000 0001 31 0000 000c 6865 6c6c 6f20 776f 726c 640a'; xxd -r -p <<< "$msg"| netcat localhost 8989 -w 6
