I'd strongly suggest using async interfaces. Since the majority of time is obviously spent waiting for the IO, you likely can get all the throughput from just a single thread.
Here's an example that does answer your question (how to keep a client open for more than one request) while making the processing asynchronous. Right now, the downside is that all requests on a single client need to be sequenced (that's what I used the _tasks
queue for). However this should probably serve as inspiration.
Note that the initiation functions work with all completion handler result types: net::use_future
, net::spawn
(coroutines) etc.
Live On Coliru
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/beast.hpp>
#include <boost/beast/ssl.hpp>
#include <chrono>
#include <deque>
#include <iomanip>
#include <iostream>
namespace net = boost::asio;
namespace ssl = net::ssl;
namespace beast = boost::beast;
namespace http = beast::http;
using clk = std::chrono::steady_clock;
using net::ip::tcp;
using beast::error_code;
using namespace std::chrono_literals;
/* Used to compute the latency and data rate, which will be used to compute the
* number of I/O threads for the next run. */
struct TimeBytes {
long double ms;
size_t bytes;
};
static std::vector<TimeBytes> timeBytes;
static std::mutex timeBytesMutex;
struct Url {
struct Spec {
std::string hostname, port;
bool operator<(Spec const& rhs) const {
return std::tie(hostname, port) < std::tie(rhs.hostname, rhs.port);
}
};
std::string protocol, hostname, port, path;
Spec specification() const { return {hostname, port}; }
};
#include <boost/spirit/home/x3.hpp>
#include <boost/fusion/adapted/std_tuple.hpp>
namespace x3 = boost::spirit::x3;
Url parseUrl(std::string const& url)
{
Url ret;
std::string hostport;
{
static const auto url_ = *(x3::char_ - "://") >> "://" // protocol
>> +~x3::char_('/') // hostname
>> *x3::char_; // path
auto into = std::tie(ret.protocol, hostport, ret.path);
parse(begin(url), end(url), x3::expect[url_], into);
}
{
static const auto portspec_ = -(':' >> x3::uint_) >> x3::eoi;
static const auto hostport_ =
x3::raw[+(+~x3::char_(':') | !portspec_ >> x3::char_)] //
>> -portspec_;
boost::optional<uint16_t> port;
auto into = std::tie(ret.hostname, port);
parse(begin(hostport), end(hostport), x3::expect[hostport_], into);
if (port.has_value()) { ret.port = std::to_string(*port); }
else if (ret.protocol == "https") { ret.port = "443"; }
else if (ret.protocol == "http") { ret.port = "80"; }
else { ret.port = "0"; }
}
return ret;
}
struct Client : std::enable_shared_from_this<Client> {
public:
Client(net::any_io_executor ex, Url::Spec spec, ssl::context& ctx)
: _executor(ex)
, _spec(spec)
, _sslcontext(ctx)
{
}
template <typename Token>
auto async_request(http::verb verb, std::string const& path,
std::string const& data, Token&& token)
{
using R = typename net::async_result<std::decay_t<Token>,
void(error_code, std::string)>;
using H = typename R::completion_handler_type;
H handler(std::forward<Token>(token));
R result(handler);
auto chain_tasks = [this, h = std::move(handler),
self = shared_from_this()](auto&&... args) mutable {
if (!self->_tasks.empty()) {
dispatch(self->_executor, [this, self] {
if (not _tasks.empty()) _tasks.pop_front();
if (not _tasks.empty()) _tasks.front()->initiate();
});
}
std::move(h)(std::forward<decltype(args)>(args)...);
};
auto task = std::make_shared<RequestOp<decltype(chain_tasks)>>(
this, verb, path, data, chain_tasks);
enqueue(std::move(task));
return result.get();
}
template <typename Token>
auto async_post(std::string const& path, std::string const& data,
Token&& token)
{
return async_request(http::verb::post,path, data, std::forward<Token>(token));
}
template <typename Token>
auto async_get(std::string const& path, Token&& token)
{
return async_request(http::verb::get,path, "", std::forward<Token>(token));
}
private:
template <typename Token> auto async_reconnect(Token&& token)
{
using R = typename net::async_result<std::decay_t<Token>, void(error_code)>;
using H = typename R::completion_handler_type;
H handler(std::forward<Token>(token));
R result(handler);
assert(!_stream.has_value()); // probably a program flow bu
_stream.emplace(_executor, _sslcontext);
std::make_shared<ReconnectOp<H>>(this, std::move(handler))->start();
return result.get();
}
template <typename Handler>
struct ReconnectOp : std::enable_shared_from_this<ReconnectOp<Handler>> {
ReconnectOp(Client* client, Handler h)
: _client{client}
, _handler(std::move(h))
, _resolver(client->_stream->get_executor())
{
}
Client* _client;
Handler _handler;
tcp::resolver _resolver;
bool checked(error_code ec, bool complete = false) {
if (complete || ec)
std::move(_handler)(ec);
if (ec && _client->_stream.has_value())
{
std::cerr << "Socket " << _client->_stream->native_handle()
<< " closed due to " << ec.message() << std::endl;
_client->_stream.reset();
}
return !ec.failed();
}
void start()
{
_resolver.async_resolve(
_client->_spec.hostname, _client->_spec.port,
beast::bind_front_handler(&ReconnectOp::on_resolved,
this->shared_from_this()));
}
void on_resolved(error_code ec, tcp::resolver::results_type ep)
{
if (checked(ec)) {
beast::get_lowest_layer(*_client->_stream)
.async_connect(
ep,
beast::bind_front_handler(&ReconnectOp::on_connected,
this->shared_from_this()));
}
}
void on_connected(error_code ec, tcp::endpoint ep) {
if (checked(ec)) {
std::cerr << "Socket " << _client->_stream->native_handle()
<< " (re)connected to " << ep << std::endl;
auto& hostname = _client->_spec.hostname;
SSL_set_tlsext_host_name(_client->_stream->native_handle(),
hostname.c_str());
_client->_stream->async_handshake(
Stream::client,
beast::bind_front_handler(&ReconnectOp::on_ready,
this->shared_from_this()));
}
}
void on_ready(error_code ec) {
checked(ec, true);
}
};
struct IAsyncTask {
virtual void initiate() = 0;
};
template <typename Handler>
struct RequestOp : IAsyncTask, std::enable_shared_from_this<RequestOp<Handler>> {
RequestOp(Client* client, http::verb verb, std::string const& path,
std::string data, Handler h)
: _client(client)
, _handler(std::move(h))
, _request(verb, path, 11, std::move(data))
{
_request.set(http::field::host, _client->_spec.hostname);
_request.set(http::field::connection, "keep-alive");
_request.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
_request.set(http::field::content_type, "application/json");
_request.set(http::field::accept, "application/json");
_request.prepare_payload();
}
Client* _client;
Handler _handler;
http::request<http::string_body> _request;
http::response<http::string_body> _response;
beast::flat_buffer _buffer;
size_t _bandwidth = 0;
clk::time_point _start = clk::now();
bool checked(error_code ec, bool complete = false) {
if (complete || ec)
std::move(_handler)(ec, std::move(_response.body()));
if (ec)
_client->_stream.reset();
return !ec.failed();
}
void initiate() override
{
if (!_client->_stream.has_value()) {
_client->async_reconnect(beast::bind_front_handler(
&RequestOp::on_connected, this->shared_from_this()));
} else {
on_connected(error_code{});
}
}
void on_connected(error_code ec) {
_start = clk::now(); // This matches the start of measurements in
// the original, synchronous code
http::async_write(*_client->_stream, _request,
beast::bind_front_handler(
&RequestOp::on_sent, this->shared_from_this()));
}
void on_sent(error_code ec, size_t transferred) {
_bandwidth += transferred; // measuring actual bytes including HTTP headers
if (checked(ec)) {
http::async_read(
*_client->_stream, _buffer, _response,
beast::bind_front_handler(&RequestOp::on_response,
this->shared_from_this()));
}
}
void on_response(error_code ec, size_t transferred) {
_bandwidth += transferred; // measuring actual bytes including HTTP headers
std::lock_guard lk(timeBytesMutex);
timeBytes.push_back({(clk::now() - _start) / 1.0ms, _bandwidth});
checked(ec, true);
}
};
private:
net::any_io_executor _executor;
Url::Spec _spec;
ssl::context& _sslcontext;
using Stream = beast::ssl_stream<beast::tcp_stream>;
std::optional<Stream> _stream; // nullopt when disconnected
// task queueing
using AsyncTask = std::shared_ptr<IAsyncTask>;
std::deque<AsyncTask> _tasks;
void enqueue(AsyncTask task) {
post(_executor,
[=, t = std::move(task), this, self = shared_from_this()] {
_tasks.push_back(std::move(t));
if (_tasks.size() == 1) {
_tasks.front()->initiate();
}
});
}
};
int main()
{
ssl::context ctx(ssl::context::tlsv12_client);
ctx.set_verify_mode(ssl::verify_peer);
ctx.set_default_verify_paths();
// load_root_certificates(ctx);
net::thread_pool io(1);
std::map<Url::Spec, std::shared_ptr<Client> > pool;
using V = http::verb;
for (auto [url, verb, data] : {
std::tuple //
{"https://httpbin.org/post", V::post, "post data"},
{"https://httpbin.org/delay/5", V::delete_, ""},
{"https://httpbin.org/base64/ZGVjb2RlZCBiYXM2NA==", V::get, ""},
{"https://httpbin.org/delay/7", V::patch, ""},
{"https://httpbin.org/stream/3", V::get, ""},
{"https://httpbin.org/uuid", V::get, ""},
}) //
{
auto parsed = parseUrl(url);
std::cout << std::quoted(parsed.protocol) << " "
<< std::quoted(parsed.hostname) << " "
<< std::quoted(parsed.port) << " "
<< std::quoted(parsed.path) << "\n";
auto spec = parsed.specification();
if (!pool.contains(spec)) {
pool.emplace(spec,
std::make_shared<Client>(
make_strand(io.get_executor()), spec, ctx));
}
pool.at(spec)->async_request(
verb, parsed.path, data,
[=, v = verb, u = url](error_code ec, std::string const& body) {
std::cout << v << " to " << u << ": " << std::quoted(body)
<< std::endl;
});
}
io.join();
for (auto& [time, bytes] : timeBytes) {
std::cout << bytes << " bytes in " << time << "ms\n";
}
}
On my system this prints
"https" "httpbin.org" "443" "/post"
"https" "httpbin.org" "443" "/delay/5"
"https" "httpbin.org" "443" "/base64/ZGVjb2RlZCBiYXM2NA=="
"https" "httpbin.org" "443" "/delay/7"
"https" "httpbin.org" "443" "/stream/3"
"https" "httpbin.org" "443" "/uuid"
Socket 0x7f4ad4001060 (re)connected to 18.232.227.86:443
POST to https://httpbin.org/post: "{
\"args\": {},
\"data\": \"post data\",
\"files\": {},
\"form\": {},
\"headers\": {
\"Accept\": \"application/json\",
\"Content-Length\": \"9\",
\"Content-Type\": \"application/json\",
\"Host\": \"httpbin.org\",
\"User-Agent\": \"Boost.Beast/318\",
\"X-Amzn-Trace-Id\": \"Root=1-618b513c-2c51c112061b10456a5e3d4e\"
},
\"json\": null,
\"origin\": \"163.158.244.77\",
\"url\": \"https://httpbin.org/post\"
}
"
DELETE to https://httpbin.org/delay/5: "{
\"args\": {},
\"data\": \"\",
\"files\": {},
\"form\": {},
\"headers\": {
\"Accept\": \"application/json\",
\"Content-Type\": \"application/json\",
\"Host\": \"httpbin.org\",
\"User-Agent\": \"Boost.Beast/318\",
\"X-Amzn-Trace-Id\": \"Root=1-618b513c-324c97504eb79d8b743c6c5d\"
},
\"origin\": \"163.158.244.77\",
\"url\": \"https://httpbin.org/delay/5\"
}
"
GET to https://httpbin.org/base64/ZGVjb2RlZCBiYXM2NA==: "decoded bas64"
PATCH to https://httpbin.org/delay/7: "{
\"args\": {},
\"data\": \"\",
\"files\": {},
\"form\": {},
\"headers\": {
\"Accept\": \"application/json\",
\"Content-Type\": \"application/json\",
\"Host\": \"httpbin.org\",
\"User-Agent\": \"Boost.Beast/318\",
\"X-Amzn-Trace-Id\": \"Root=1-618b5141-3a8c30e60562df583061fc5a\"
},
\"origin\": \"163.158.244.77\",
\"url\": \"https://httpbin.org/delay/7\"
}
"
GET to https://httpbin.org/stream/3: "{\"url\": \"https://httpbin.org/stream/3\", \"args\": {}, \"headers\": {\"Host\": \"httpbin.org\", \"X-Amzn-Trace-Id\": \"Root=1-618b5148-45fce8a8432930a006c0a574\", \"User-Agent\": \"Boost.Beast/318\", \"Content-Type\": \"application/json\", \"Accept\": \"application/json\"}, \"origin\": \"163.158.244.77\", \"id\": 0}
{\"url\": \"https://httpbin.org/stream/3\", \"args\": {}, \"headers\": {\"Host\": \"httpbin.org\", \"X-Amzn-Trace-Id\": \"Root=1-618b5148-45fce8a8432930a006c0a574\", \"User-Agent\": \"Boost.Beast/318\", \"Content-Type\": \"application/json\", \"Accept\": \"application/json\"}, \"origin\": \"163.158.244.77\", \"id\": 1}
{\"url\": \"https://httpbin.org/stream/3\", \"args\": {}, \"headers\": {\"Host\": \"httpbin.org\", \"X-Amzn-Trace-Id\": \"Root=1-618b5148-45fce8a8432930a006c0a574\", \"User-Agent\": \"Boost.Beast/318\", \"Content-Type\": \"application/json\", \"Accept\": \"application/json\"}, \"origin\": \"163.158.244.77\", \"id\": 2}
"
GET to https://httpbin.org/uuid: "{
\"uuid\": \"4557c909-880e-456c-8ef9-049a72f5fda1\"
}
"
826 bytes in 84.9807ms
752 bytes in 5267.26ms
425 bytes in 84.6031ms
751 bytes in 7085.28ms
1280 bytes in 86.6554ms
434 bytes in 85.0086ms
Note:
httpbin.org has all manner of test urls - some of which generate long delays, hence the timings
there's only 1 connection. In case of an IO error, we disconnect (and things should reconnect on the next request)
HTTP errors are not "errors" in that the connection stays valid
The DNS resolve, connect and handshake are also asynchronous