3

I'm making about 30,000 queries to a GraphQL server; because I have a high-latency connection, I'm doing many queries in parallel, using threads. Currently each query makes a new connection; I'd like to reuse the connections, which should reduce the time the whole download takes. Here's my code:

#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/beast.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/asio/ssl/error.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <chrono>
#include <vector>
#include <array>
#include <iostream>
#include "http.h"

namespace beast=boost::beast;
namespace http=beast::http;
namespace net=boost::asio;
namespace ssl=net::ssl;
using tcp=net::ip::tcp;
using namespace std;
namespace cr=chrono;

struct TimeBytes
/* Used to compute the latency and data rate, which will be used
 * to compute the number of I/O threads for the next run.
 */
{
  float ms;
  int bytes;
};

cr::steady_clock clk;
vector<TimeBytes> timeBytes;
mutex timeBytesMutex;
thread_local string lastProto,lastHost,lastPort;

array<string,4> parseUrl(string url)
// protocol, hostname, port, path. All are strings, including the port.
{
  size_t pos0=url.find("://");
  size_t pos1;
  array<string,4> ret;
  ret[0]=url.substr(0,pos0);
  if (pos0<url.length())
    pos0+=3;
  pos1=url.find("/",pos0);
  ret[1]=url.substr(pos0,pos1-pos0);
  ret[3]=url.substr(pos1);
  pos0=ret[1].find(":");
  if (pos0<ret[1].length())
  {
    ret[2]=ret[1].substr(pos0+1);
    ret[1]=ret[1].substr(0,pos0);
  }
  else
    if (ret[0]=="https")
      ret[2]="443";
    else if (ret[0]=="https")
      ret[2]="80";
    else
      ret[2]="0";
  return ret;
}

string httpPost(string url,string data)
{
  net::io_context context;
  ssl::context ctx(ssl::context::tlsv12_client);
  tcp::resolver res(context);
  tcp::resolver::results_type endpoints;
  beast::ssl_stream<beast::tcp_stream> stream(context,ctx);
  array<string,4> parsed=parseUrl(url);
  http::request<http::string_body> req;
  http::response<http::string_body> resp;
  beast::flat_buffer buffer;
  TimeBytes tb;
  cr::nanoseconds elapsed;
  cr::time_point<cr::steady_clock> timeStart=clk.now();
  //if (parsed[0]==lastProto && parsed[1]==lastHost && parsed[2]==lastPort)
    //cout<<"same host\n";
  //load_root_certificates(ctx);
  try
  {
    ctx.set_verify_mode(ssl::verify_peer);
    endpoints=res.resolve(parsed[1],parsed[2]);
    beast::get_lowest_layer(stream).connect(endpoints);
    SSL_set_tlsext_host_name(stream.native_handle(),parsed[1].c_str());
    if (parsed[0]=="https")
      stream.handshake(net::ssl::stream_base::client);
    req.method(http::verb::post);
    req.target(parsed[3]);
    req.set(http::field::host,parsed[1]);
    req.set(http::field::connection,"keep-alive");
    req.set(http::field::user_agent,BOOST_BEAST_VERSION_STRING);
    req.set(http::field::content_type,"application/json");
    req.set(http::field::accept,"application/json");
    req.body()=data;
    req.prepare_payload();
    http::write(stream,req);
    http::read(stream,buffer,resp);
    elapsed=clk.now()-timeStart;
    tb.ms=elapsed.count()/1e6;
    tb.bytes=req.body().size()+resp.body().size()+7626;
    // 7626 accounts for HTTP, TCP, IP, and Ethernet headers.
    timeBytesMutex.lock();
    timeBytes.push_back(tb);
    timeBytesMutex.unlock();
    beast::close_socket(beast::get_lowest_layer(stream));
    if (DEBUG_QUERY)
    {
      cout<<parsed[0]<<"|\n"<<parsed[1]<<"|\n"<<parsed[2]<<"|\n"<<parsed[3]<<"|\n";
      cout<<data<<"|\n";
    }
  }
  catch (...)
  {
  }
  lastProto=parsed[0];
  lastHost=parsed[1];
  lastPort=parsed[2];
  return resp.body();
}

Most of the requests are to one server. A few GET requests are made to another server (using an httpGet function which is pretty similar to httpPost). After I download the data, I crunch them, so I'd like to close the connections before starting to crunch.

I tried making context, ctx, and stream thread-local, and stream.shutdown() and context.restart() before close_socket(), but the program crashed the second time the main thread called httpPost, from http::read throwing an error. (A worker thread made one query between the main thread's two queries.) At that point I was not trying to keep the connection open, but trying to make thread-local work so that I could keep the connection open.

Pierre Abbat
  • 485
  • 4
  • 10
  • you'll need to refactor your code to not use [global variables](https://stackoverflow.com/questions/10525582/why-are-global-variables-considered-bad-practice), a class would help here, have each thread have its own instance of a connection class and store the socket in there – Alan Birtles Nov 09 '21 at 10:31

1 Answers1

5

I'd strongly suggest using async interfaces. Since the majority of time is obviously spent waiting for the IO, you likely can get all the throughput from just a single thread.

Here's an example that does answer your question (how to keep a client open for more than one request) while making the processing asynchronous. Right now, the downside is that all requests on a single client need to be sequenced (that's what I used the _tasks queue for). However this should probably serve as inspiration.

Note that the initiation functions work with all completion handler result types: net::use_future, net::spawn (coroutines) etc.

Live On Coliru

#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/beast.hpp>
#include <boost/beast/ssl.hpp>
#include <chrono>
#include <deque>
#include <iomanip>
#include <iostream>

namespace net   = boost::asio;
namespace ssl   = net::ssl;
namespace beast = boost::beast;
namespace http  = beast::http;
using clk       = std::chrono::steady_clock;
using net::ip::tcp;
using beast::error_code;

using namespace std::chrono_literals;

/* Used to compute the latency and data rate, which will be used to compute the
 * number of I/O threads for the next run.  */
struct TimeBytes {
    long double ms;
    size_t      bytes;
};

static std::vector<TimeBytes> timeBytes;
static std::mutex             timeBytesMutex;

struct Url {
    struct Spec {
        std::string hostname, port;

        bool operator<(Spec const& rhs) const {
            return std::tie(hostname, port) < std::tie(rhs.hostname, rhs.port);
        }
    };
    std::string protocol, hostname, port, path;

    Spec specification() const { return {hostname, port}; }
};

#include <boost/spirit/home/x3.hpp>
#include <boost/fusion/adapted/std_tuple.hpp>
namespace x3 = boost::spirit::x3;

Url parseUrl(std::string const& url)
{
    Url ret;
    std::string hostport;
    {
        static const auto url_ = *(x3::char_ - "://") >> "://" // protocol
            >> +~x3::char_('/')                                // hostname
            >> *x3::char_;                                     // path
        auto into = std::tie(ret.protocol, hostport, ret.path);
        parse(begin(url), end(url), x3::expect[url_], into);
    }

    {
        static const auto portspec_ = -(':' >> x3::uint_) >> x3::eoi;
        static const auto hostport_ =
            x3::raw[+(+~x3::char_(':') | !portspec_ >> x3::char_)] //
            >> -portspec_;

        boost::optional<uint16_t> port;
        auto into = std::tie(ret.hostname, port);
        parse(begin(hostport), end(hostport), x3::expect[hostport_], into);

        if (port.has_value())             { ret.port = std::to_string(*port); } 
        else if (ret.protocol == "https") { ret.port = "443";                 } 
        else if (ret.protocol == "http")  { ret.port = "80";                  } 
        else                              { ret.port = "0";                   } 
    }

    return ret;
}

struct Client : std::enable_shared_from_this<Client> {
  public:
    Client(net::any_io_executor ex, Url::Spec spec, ssl::context& ctx)
        : _executor(ex)
        , _spec(spec)
        , _sslcontext(ctx)
    {
    }

    template <typename Token>
    auto async_request(http::verb verb, std::string const& path,
                       std::string const& data, Token&& token)
    {
        using R = typename net::async_result<std::decay_t<Token>,
                                             void(error_code, std::string)>;
        using H = typename R::completion_handler_type;
        H handler(std::forward<Token>(token));
        R result(handler);

        auto chain_tasks = [this, h = std::move(handler),
                            self = shared_from_this()](auto&&... args) mutable {
            if (!self->_tasks.empty()) {
                dispatch(self->_executor, [this, self] {
                    if (not _tasks.empty()) _tasks.pop_front();
                    if (not _tasks.empty()) _tasks.front()->initiate();
                });
            }

            std::move(h)(std::forward<decltype(args)>(args)...);
        };

        auto task = std::make_shared<RequestOp<decltype(chain_tasks)>>(
            this, verb, path, data, chain_tasks);

        enqueue(std::move(task));

        return result.get();
    }

    template <typename Token>
    auto async_post(std::string const& path, std::string const& data,
                    Token&& token)
    {
        return async_request(http::verb::post,path, data, std::forward<Token>(token));
    }

    template <typename Token>
    auto async_get(std::string const& path, Token&& token)
    {
        return async_request(http::verb::get,path, "", std::forward<Token>(token));
    }

  private:
    template <typename Token> auto async_reconnect(Token&& token)
    {
        using R = typename net::async_result<std::decay_t<Token>, void(error_code)>;
        using H = typename R::completion_handler_type;
        H handler(std::forward<Token>(token));
        R result(handler);

        assert(!_stream.has_value()); // probably a program flow bu
        _stream.emplace(_executor, _sslcontext);

        std::make_shared<ReconnectOp<H>>(this, std::move(handler))->start();

        return result.get();
    }

    template <typename Handler>
    struct ReconnectOp : std::enable_shared_from_this<ReconnectOp<Handler>> {
        ReconnectOp(Client* client, Handler h)
            : _client{client}
            , _handler(std::move(h))
            , _resolver(client->_stream->get_executor())
        {
        }

        Client*       _client;
        Handler       _handler;
        tcp::resolver _resolver;

        bool checked(error_code ec, bool complete = false) {
            if (complete || ec)
                std::move(_handler)(ec);
            if (ec && _client->_stream.has_value())
            {
                std::cerr << "Socket " << _client->_stream->native_handle()
                          << " closed due to " << ec.message() << std::endl;
                _client->_stream.reset();
            }

            return !ec.failed();
        }

        void start()
        {
            _resolver.async_resolve(
                _client->_spec.hostname, _client->_spec.port,
                beast::bind_front_handler(&ReconnectOp::on_resolved,
                                          this->shared_from_this()));
        }

        void on_resolved(error_code ec, tcp::resolver::results_type ep)
        {
            if (checked(ec)) {
                beast::get_lowest_layer(*_client->_stream)
                    .async_connect(
                        ep,
                        beast::bind_front_handler(&ReconnectOp::on_connected,
                                                  this->shared_from_this()));
            }
        }

        void on_connected(error_code ec, tcp::endpoint ep) {
            if (checked(ec)) {
                std::cerr << "Socket " << _client->_stream->native_handle()
                          << " (re)connected to " << ep << std::endl;

                auto& hostname = _client->_spec.hostname;
                SSL_set_tlsext_host_name(_client->_stream->native_handle(),
                                         hostname.c_str());

                _client->_stream->async_handshake(
                    Stream::client,
                    beast::bind_front_handler(&ReconnectOp::on_ready,
                                              this->shared_from_this()));
            }
        }

        void on_ready(error_code ec) {
            checked(ec, true);
        }
    };

    struct IAsyncTask {
        virtual void initiate() = 0;
    };

    template <typename Handler>
    struct RequestOp : IAsyncTask, std::enable_shared_from_this<RequestOp<Handler>> {
        RequestOp(Client* client, http::verb verb, std::string const& path,
                  std::string data, Handler h)
            : _client(client)
            , _handler(std::move(h))
            , _request(verb, path, 11, std::move(data))
        {
            _request.set(http::field::host,         _client->_spec.hostname);
            _request.set(http::field::connection,   "keep-alive");
            _request.set(http::field::user_agent,   BOOST_BEAST_VERSION_STRING);
            _request.set(http::field::content_type, "application/json");
            _request.set(http::field::accept,       "application/json");
            _request.prepare_payload();
        }

        Client*                           _client;
        Handler                           _handler;
        http::request<http::string_body>  _request;
        http::response<http::string_body> _response;
        beast::flat_buffer                _buffer;
        size_t                            _bandwidth = 0;
        clk::time_point                   _start = clk::now();

        bool checked(error_code ec, bool complete = false) {
            if (complete || ec)
                std::move(_handler)(ec, std::move(_response.body()));
            if (ec)
                _client->_stream.reset();

            return !ec.failed();
        }

        void initiate() override
        {
            if (!_client->_stream.has_value()) {
                _client->async_reconnect(beast::bind_front_handler(
                    &RequestOp::on_connected, this->shared_from_this()));
            } else {
                on_connected(error_code{});
            }
        }

        void on_connected(error_code ec) {
            _start = clk::now(); // This matches the start of measurements in
                                 // the original, synchronous code
            http::async_write(*_client->_stream, _request,
                              beast::bind_front_handler(
                                  &RequestOp::on_sent, this->shared_from_this()));
        }

        void on_sent(error_code ec, size_t transferred) {
            _bandwidth += transferred; // measuring actual bytes including HTTP headers

            if (checked(ec)) {
                http::async_read(
                    *_client->_stream, _buffer, _response,
                    beast::bind_front_handler(&RequestOp::on_response,
                                              this->shared_from_this()));
            }
        }

        void on_response(error_code ec, size_t transferred) {
            _bandwidth += transferred; // measuring actual bytes including HTTP headers

            std::lock_guard lk(timeBytesMutex);
            timeBytes.push_back({(clk::now() - _start) / 1.0ms, _bandwidth});

            checked(ec, true);
        }
    };

  private:
    net::any_io_executor _executor;
    Url::Spec            _spec;
    ssl::context&        _sslcontext;

    using Stream = beast::ssl_stream<beast::tcp_stream>;
    std::optional<Stream> _stream; // nullopt when disconnected

    // task queueing
    using AsyncTask = std::shared_ptr<IAsyncTask>;
    std::deque<AsyncTask> _tasks;

    void enqueue(AsyncTask task) {
        post(_executor,
             [=, t = std::move(task), this, self = shared_from_this()] {
                 _tasks.push_back(std::move(t));
                 if (_tasks.size() == 1) {
                     _tasks.front()->initiate();
                 }
             });
    }
};

int main()
{
    ssl::context ctx(ssl::context::tlsv12_client);
    ctx.set_verify_mode(ssl::verify_peer);
    ctx.set_default_verify_paths();
    // load_root_certificates(ctx);

    net::thread_pool io(1);
    std::map<Url::Spec, std::shared_ptr<Client> > pool;

    using V = http::verb;
    for (auto [url, verb, data] : {
     std::tuple //
     {"https://httpbin.org/post",                        V::post,    "post data"},
     {"https://httpbin.org/delay/5",                     V::delete_, ""},
     {"https://httpbin.org/base64/ZGVjb2RlZCBiYXM2NA==", V::get,     ""},
     {"https://httpbin.org/delay/7",                     V::patch,   ""},
     {"https://httpbin.org/stream/3",                    V::get,     ""},
     {"https://httpbin.org/uuid",                        V::get,     ""},
    }) //
    {
        auto parsed = parseUrl(url);
        std::cout << std::quoted(parsed.protocol) << " "
                  << std::quoted(parsed.hostname) << " "
                  << std::quoted(parsed.port) << " "
                  << std::quoted(parsed.path) << "\n";

        auto spec = parsed.specification();

        if (!pool.contains(spec)) {
            pool.emplace(spec,
                         std::make_shared<Client>(
                             make_strand(io.get_executor()), spec, ctx));
        }

        pool.at(spec)->async_request(
            verb, parsed.path, data,
            [=, v = verb, u = url](error_code ec, std::string const& body) {
                std::cout << v << " to " << u << ": " << std::quoted(body)
                          << std::endl;
            });
    }

    io.join();

    for (auto& [time, bytes] : timeBytes) {
        std::cout << bytes << " bytes in " << time << "ms\n";
    }
}

On my system this prints

"https" "httpbin.org" "443" "/post"
"https" "httpbin.org" "443" "/delay/5"
"https" "httpbin.org" "443" "/base64/ZGVjb2RlZCBiYXM2NA=="
"https" "httpbin.org" "443" "/delay/7"
"https" "httpbin.org" "443" "/stream/3"
"https" "httpbin.org" "443" "/uuid"
Socket 0x7f4ad4001060 (re)connected to 18.232.227.86:443
POST to https://httpbin.org/post: "{
  \"args\": {}, 
  \"data\": \"post data\", 
  \"files\": {}, 
  \"form\": {}, 
  \"headers\": {
    \"Accept\": \"application/json\", 
    \"Content-Length\": \"9\", 
    \"Content-Type\": \"application/json\", 
    \"Host\": \"httpbin.org\", 
    \"User-Agent\": \"Boost.Beast/318\", 
    \"X-Amzn-Trace-Id\": \"Root=1-618b513c-2c51c112061b10456a5e3d4e\"
  }, 
  \"json\": null, 
  \"origin\": \"163.158.244.77\", 
  \"url\": \"https://httpbin.org/post\"
}
"
DELETE to https://httpbin.org/delay/5: "{
  \"args\": {}, 
  \"data\": \"\", 
  \"files\": {}, 
  \"form\": {}, 
  \"headers\": {
    \"Accept\": \"application/json\", 
    \"Content-Type\": \"application/json\", 
    \"Host\": \"httpbin.org\", 
    \"User-Agent\": \"Boost.Beast/318\", 
    \"X-Amzn-Trace-Id\": \"Root=1-618b513c-324c97504eb79d8b743c6c5d\"
  }, 
  \"origin\": \"163.158.244.77\", 
  \"url\": \"https://httpbin.org/delay/5\"
}
"
GET to https://httpbin.org/base64/ZGVjb2RlZCBiYXM2NA==: "decoded bas64"
PATCH to https://httpbin.org/delay/7: "{
  \"args\": {}, 
  \"data\": \"\", 
  \"files\": {}, 
  \"form\": {}, 
  \"headers\": {
    \"Accept\": \"application/json\", 
    \"Content-Type\": \"application/json\", 
    \"Host\": \"httpbin.org\", 
    \"User-Agent\": \"Boost.Beast/318\", 
    \"X-Amzn-Trace-Id\": \"Root=1-618b5141-3a8c30e60562df583061fc5a\"
  }, 
  \"origin\": \"163.158.244.77\", 
  \"url\": \"https://httpbin.org/delay/7\"
}
"
GET to https://httpbin.org/stream/3: "{\"url\": \"https://httpbin.org/stream/3\", \"args\": {}, \"headers\": {\"Host\": \"httpbin.org\", \"X-Amzn-Trace-Id\": \"Root=1-618b5148-45fce8a8432930a006c0a574\", \"User-Agent\": \"Boost.Beast/318\", \"Content-Type\": \"application/json\", \"Accept\": \"application/json\"}, \"origin\": \"163.158.244.77\", \"id\": 0}
{\"url\": \"https://httpbin.org/stream/3\", \"args\": {}, \"headers\": {\"Host\": \"httpbin.org\", \"X-Amzn-Trace-Id\": \"Root=1-618b5148-45fce8a8432930a006c0a574\", \"User-Agent\": \"Boost.Beast/318\", \"Content-Type\": \"application/json\", \"Accept\": \"application/json\"}, \"origin\": \"163.158.244.77\", \"id\": 1}
{\"url\": \"https://httpbin.org/stream/3\", \"args\": {}, \"headers\": {\"Host\": \"httpbin.org\", \"X-Amzn-Trace-Id\": \"Root=1-618b5148-45fce8a8432930a006c0a574\", \"User-Agent\": \"Boost.Beast/318\", \"Content-Type\": \"application/json\", \"Accept\": \"application/json\"}, \"origin\": \"163.158.244.77\", \"id\": 2}
"
GET to https://httpbin.org/uuid: "{
  \"uuid\": \"4557c909-880e-456c-8ef9-049a72f5fda1\"
}
"
826 bytes in 84.9807ms
752 bytes in 5267.26ms
425 bytes in 84.6031ms
751 bytes in 7085.28ms
1280 bytes in 86.6554ms
434 bytes in 85.0086ms

Note:

  • httpbin.org has all manner of test urls - some of which generate long delays, hence the timings

  • there's only 1 connection. In case of an IO error, we disconnect (and things should reconnect on the next request)

  • HTTP errors are not "errors" in that the connection stays valid

  • The DNS resolve, connect and handshake are also asynchronous

sehe
  • 374,641
  • 47
  • 450
  • 633