0

We are working on developing a low latency high throughput client which connects to different exchanges. The main piece of code that actually sends HTTP requests through is this below

void send_request(http_client::request_parameters request_params, http_client::finish_handler finish_handler, http_client::failure_handler failure_handler) {
    boost::asio::spawn(
        client_context.io_context,
        [request_params = std::move(request_params), finish_handler = std::move(finish_handler), failure_handler = std::move(failure_handler), this](
            boost::asio::yield_context yield_context) mutable 
        {
            try {
                auto const request_start_time = blosh::Time::get_current_time_in_nano_seconds();
                std::string request_url(request_params.target);
                if (!request_params.query_params.empty()) {
                    std::vector<std::string> params_strings;
                    for (auto const &param : request_params.query_params) {
                        params_strings.emplace_back(
                            fmt::format("{}={}", param.key().data(), param.value().get_string().data()));
                    }
                    request_url.append("?");
                    request_url.append(boost::algorithm::join(params_strings, "&"));
                }

                boost::beast::http::request<boost::beast::http::string_body> http_request{
                    request_params.verb,
                    request_url,
                    11};
                http_request.set(boost::beast::http::field::host, http_hostname);
                http_request.set(boost::beast::http::field::content_type, "application/json");
                http_request.set(boost::beast::http::field::connection, "Keep-Alive");
                http_request.set(boost::beast::http::field::keep_alive, "timeout=86400");
                http_request.keep_alive(true);

                for (auto const &param : request_params.header_params) {
                    http_request.set(param.first, param.second);
                }
                if (const boost::json::object *json_object =
                        std::get_if<boost::json::object>(&request_params.body_params)) {
                    if (!json_object->empty()) http_request.body() = boost::json::serialize(*json_object);
                } else if (const boost::json::array *json_array =
                        std::get_if<boost::json::array>(&request_params.body_params)) {
                    if (!json_array->empty()) http_request.body() = boost::json::serialize(*json_array);
                }

                http_request.prepare_payload();
                boost::system::error_code ec;
                boost::beast::http::async_write(tcp_stream.value(), http_request, yield_context[ec]);
                if (ec) {
                    LOG_ERROR << "[http_client] [send_request] async_write failed. error: " << ec.what();
                    failure_handler(std::move(request_params), std::move(finish_handler), http_hostname_index, retries); 
                }

                LOG_INFO << "[http_client] [send_request] method = " << http_request.method_string() << ", target = " << http_request.target() << ", hostname: " << http_hostname 
                    << ", id = " << this->get_unique_id() << ", usage = " << this->get_usage_counter();

                // Declare a container to hold the response and empty out the buffer.
                response http_response;
                boost::beast::http::async_read(tcp_stream.value(), receive_flat_buffer, http_response, yield_context[ec]);
                if (ec) {
                    LOG_ERROR << "[http_client] [send_request] async_read failed. error: " << ec.what();
                }
                connection_latency_ns = static_cast<uint64_t>(blosh::Time::get_current_time_in_nano_seconds() - request_start_time);
                usage += 1;

                finish_handler(http_response);
            } catch (const std::exception& e) {
                LOG_ERROR << "[http_client] [send_request] failure. error: " << e.what() << ", hostname = " << http_hostname << ", id = " << this->get_unique_id();
            }
        }
    );
}

And here's the IO context which runs under another thread under a busy loop

    // start both rest and websocket threads
    boost::asio::io_context rest_io_context{};
    std::thread rest_thread{};
    bool rest_io_context_run_flag{true};
    rest_thread = std::thread([&]{
#ifndef LOCAL_BUILD
        cpu_core_mutex.lock();
        if (cpu_cores.size()) {
            LOG_INFO << "rest cpu core " << cpu_cores.front();
            blosh::scheduling::pin_core(cpu_cores.front());
            cpu_cores.pop_front();
        }
        cpu_core_mutex.unlock();
#endif
        while (rest_io_context_run_flag) {
            rest_io_context.restart();
            rest_io_context.poll();
        }
    });

The reason that went with busy polling instead of .run() is because of this (Can Boost ASIO be used to build low-latency applications?)

This works well 99% of the time but when the volatility hits and lots of market activity, some of the HTTP requests are sent, and we don't hear anything back from them. We have a fallback logic that if we don't hear back in like 5 seconds, we try again which results in us sending HTTP requests through async_write but no response from async_read. For example, take a look at these logs where it logs after async_write but nothing after that

20230831 17:40:47.110427Z 24792 INFO  [http_client] [send_request] method = DELETE, target = /fapi/v1/order?symbol=XXXXXX&origClientOrderId=XXXXX&timestamp=XXXXX&signature=XXXX, hostname: fapi.binance.com, id = 9838412294785007617, usage = 218 - http_client.cpp:129

20230831 17:40:52.210959Z 24792 INFO  [http_client] [send_request] method = DELETE, target = /fapi/v1/order?symbol=XXX&origClientOrderId=XXXXX&timestamp=1693503652210&signature=XXXX, hostname: fapi.binance.com, id = XXXX, usage = 220 - http_client.cpp:129

20230831 17:40:57.297817Z 24792 INFO  [http_client] [send_request] method = DELETE, target = /fapi/v1/order?symbol=XXXXX&origClientOrderId=XXXXX&timestamp=XXXXX&signature=XXXXX, hostname: fapi.binance.com, id = XXXXX, usage = 128 - http_client.cpp:129

This bug has been frustrating us for the last week or so because in isolated mode, this works super well but when it connects with other systems, that's when the problem appears. I was wondering whether the boost::asio or boost::beast community can help me out

Is there anything on the boost::asio I am not aware of where my TCP writes / TCP reads are being discarded

Here's my sysctl.conf btw if anyone interested

net.core.wmem_max=134217728
net.core.rmem_max=134217728
net.ipv4.tcp_rmem = 4194304 8388608 134217728
net.ipv4.tcp_wmem = 4194304 8388608 134217728
net.ipv4.tcp_window_scaling = 1
net.ipv4.tcp_no_metrics_save = 1

How do I reliably track that my request actually went through kernel, network, and out of the box? How do I make sure that my completion handler is processing requests that are finished and waiting to be acted upon? Or maybe something completely else is causing this issue.

Aqib Chattha
  • 197
  • 11
Rohith Uppala
  • 167
  • 1
  • 2
  • 7
  • Who knows what kind of bugs or simple API misunderstanding are lurking within a highly complicated library like boost::beast::http. Nothing beats the simplicity of juggling sockets directly. HTTP is a fairly simple protocol, and an experienced C++ developer with knowledge of HTTP won't have much difficulty cobbling together a minimal client that talks to an HTTP server, in the described situation, and, due to the nature of this code, there won't be any mystery and unexplained behavior. – Sam Varshavchik Aug 31 '23 at 23:11
  • Try using Wireshark to monitor the traffic, that'll help you determine where the problem actually lies – Alan Birtles Sep 01 '23 at 06:19
  • @SamVarshavchik I just want to note that I disagree with this general stance - too often it's just hubris. I don't fundamentally _disagree_, but I'd view it as "you cannot out-source your core business". And "with great power comes great responsibility" because rolling your own brings a lot of potential for security and other bugs, at least on par with the avoidance of unncessary overhead/restriction. – sehe Sep 01 '23 at 07:57
  • @AlanBirtles it’s https traffic which makes it less trivial to intercept the traffic. Any suggestions on that front would be highly welcome – Rohith Uppala Sep 01 '23 at 08:23
  • @sehe thanks. The reason we went with boost::asio is that because it’s battle tested quite a lot. I prefer to understand and fix the current framework than to reimplement a new thing from scratch – Rohith Uppala Sep 01 '23 at 08:26
  • 1
    You can still see the traffic, you can see who isn't sending packets. if you have access to the server's private key you can even decrypt it – Alan Birtles Sep 01 '23 at 10:23

0 Answers0