1
#define BINANCE_HANDLER(f) beast::bind_front_handler(&binanceWS<A>::f, this->shared_from_this())

template <typename A> 
class binanceWS : public std::enable_shared_from_this<binanceWS<A>> {
    tcp::resolver      resolver_;
    Stream             ws_;
    beast::flat_buffer buffer_;
    std::string        host_;
    std::string        message_text_;

    std::string           wsTarget_ = "/ws/";
    char const*           host      = "stream.binance.com";
    char const*           port      = "9443";
    SPSCQueue<A>&         diff_messages_queue;
    std::function<void()> on_message_handler;
    // OnMessage on_message_cb;

  public:
    binanceWS(net::any_io_executor ex, ssl::context& ctx, SPSCQueue<A>& q)
        : resolver_(ex)
        , ws_(ex, ctx)
        , diff_messages_queue(q) {}

    void run(char const* host, char const* port, json message, const std::string& streamName) {
        if (!SSL_set_tlsext_host_name(ws_.next_layer().native_handle(), host)) {
            throw boost::system::system_error(
                error_code(::ERR_get_error(), net::error::get_ssl_category()));
        }

        host_         = host;
        message_text_ = message.dump();
        wsTarget_ += streamName;

        resolver_.async_resolve(host_, port, BINANCE_HANDLER(on_resolve));
    }

    void on_resolve(beast::error_code ec, tcp::resolver::results_type results) {
        if (ec)
            return fail_ws(ec, "resolve");

        if (!SSL_set_tlsext_host_name(ws_.next_layer().native_handle(), host_.c_str())) {
            throw beast::system_error{
                error_code(::ERR_get_error(), net::error::get_ssl_category())};
        }

        get_lowest_layer(ws_).expires_after(30s);

        beast::get_lowest_layer(ws_).async_connect(results, BINANCE_HANDLER(on_connect));
    }

    void on_connect(beast::error_code                                           ec,
                    [[maybe_unused]] tcp::resolver::results_type::endpoint_type ep) {
        if (ec)
            return fail_ws(ec, "connect");

        // Perform the SSL handshake
        ws_.next_layer().async_handshake(ssl::stream_base::client, BINANCE_HANDLER(on_ssl_handshake));
    }

    void on_ssl_handshake(beast::error_code ec) {
        if (ec)
            return fail_ws(ec, "ssl_handshake");

        beast::get_lowest_layer(ws_).expires_never();

        ws_.set_option(websocket::stream_base::timeout::suggested(beast::role_type::client));

        ws_.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
            req.set(http::field::user_agent,
                    std::string(BOOST_BEAST_VERSION_STRING) + " websocket-client-async");
        }));

        std::cout << "using host_: " << host_ << std::endl;
        ws_.async_handshake(host_, wsTarget_, BINANCE_HANDLER(on_handshake));
    }

    void on_handshake(beast::error_code ec) {
        if (ec) {
            return fail_ws(ec, "handshake");
        }

        std::cout << "Sending : " << message_text_ << std::endl;

        ws_.async_write(net::buffer(message_text_), BINANCE_HANDLER(on_write));
    }

    void on_write(beast::error_code ec, size_t bytes_transferred) {
        boost::ignore_unused(bytes_transferred);

        if (ec)
            return fail_ws(ec, "write");

        ws_.async_read(buffer_, BINANCE_HANDLER(on_message));
    }

    void on_message(beast::error_code ec, size_t bytes_transferred) {
        boost::ignore_unused(bytes_transferred);
        if (ec)
            return fail_ws(ec, "read");

       on_message_handler(); // WORKS FINE!!!

        ws_.async_read(buffer_, [this](beast::error_code ec, size_t n) {
            if (ec)
                return fail_ws(ec, "read");

            on_message_handler(); // DOESN'T WORK  
            buffer_.clear();
            ws_.async_read(buffer_, BINANCE_HANDLER(on_message));
        });
    }
    
    void subscribe_orderbook_diffs(const std::string action,const std::string symbol,short int depth_levels)
    {
        std::string stream = symbol+"@"+"depth"+std::to_string(depth_levels);

        
        on_message_handler = [this]() {
            std::cout << "Orderbook Levels Update" << std::endl;
            json payload = json::parse(beast::buffers_to_string(buffer_.cdata()));
            std::cout << payload << std::endl;
             
        };
        
        json jv = {
            { "method", action },
            { "params", {stream} },
            { "id", 1 }
        };
        run(host, port,jv, stream);
    }

};

int main() {
    net::io_context ioc;
    ssl::context    ctx{ssl::context::tlsv12_client};

    ctx.set_verify_mode(ssl::verify_peer);
    ctx.set_default_verify_paths();
    int         levels = 10;
    std::string symbol = "btcusdt";

    auto binancews = std::make_shared<binanceWS>(make_strand(ioc), ctx);
    binancews->subscribe_orderbook_diffs("SUBSCRIBE", symbol, levels);
    ioc.run();
}

Output : Orderbook Levels Update

terminate called after throwing an instance of 'nlohmann::detail::parse_error'
  what():  [json.exception.parse_error.101] parse error at line 1, column 687: syntax error while parsing value - unexpected '{'; expected end of input
Aborted (core dumped)

But by calling on_message_handler() inside on_message() function works just fine, problem arises when I do that (calling on_message_handler()) in inside lambda function (which is passed as handler in async_read()).

Burak
  • 2,251
  • 1
  • 16
  • 33
noobie
  • 25
  • 4
  • I strongly remember going through all this code before and making it self-contained. Why do I have to do it again? Did the other answer get deleted? I'm positive I even included the JSON handling... – sehe Oct 31 '22 at 23:12
  • Ah, found it again https://stackoverflow.com/questions/74218667/segmentation-default-core-dumped-while-passing-handlers-to-async-read#comment131041353_74218667 - It looks like you saw that code since you used some devices that are very non-standard from it. – sehe Nov 01 '22 at 02:08

1 Answers1

1

When you use on_message_handler outside on_message, buffer_ has been appended to by async_read again.

Even if you want to do the processing later, parse the JSON when you know it's valid.

If you really just want to call it in both places, don't forget the buffer_.clear() the first time...:

    on_message_handler(); // WORKS FINE!!!
    buffer_.clear();

In fact, this tells you buffer_.clear() should probably be inside on_message_handler. Even better, encapsulate it so it cannot be (a) forgotten (b) mis-used:

void handle_message() {
    auto payload = json::parse(beast::buffers_to_string(buffer_.cdata()));
    buffer_.clear();
    on_message_handler(std::move(payload));
}

void on_message(beast::error_code ec, size_t bytes_transferred) {
    boost::ignore_unused(bytes_transferred);
    if (ec)
        return fail_ws(ec, "read");

    handle_message();

    ws_.async_read(buffer_, BINANCE_HANDLER(on_message));
}

Demo

Made self-contained again

Live On Coliru

#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/json/src.hpp> // for header only
#include <deque>
#include <iostream>
namespace beast     = boost::beast;
namespace http      = beast::http;
namespace websocket = beast::websocket;
namespace net       = boost::asio;
namespace ssl       = net::ssl;
namespace json      = boost::json;
#define BINANCE_HANDLER(f)                                                     \
    beast::bind_front_handler(&binanceWS<A>::f, this->shared_from_this())

using boost::system::error_code;
using net::ip::tcp;
using Stream = websocket::stream<ssl::stream<beast::tcp_stream>>;
using namespace std::chrono_literals;

void fail_ws(auto&&...) {
    // TODO
}

template <typename T>
struct SPSCQueue {
  private:
    std::deque<T> _data;
    std::mutex mutable _mx;
};

template <typename A> 
class binanceWS : public std::enable_shared_from_this<binanceWS<A>> {
    tcp::resolver      resolver_;
    Stream             ws_;
    beast::flat_buffer buffer_;
    std::string        host_;
    std::string        message_text_;

    std::string           wsTarget_ = "/ws/";
    char const*           host      = "stream.binance.com";
    char const*           port      = "9443";
    SPSCQueue<A>&         diff_messages_queue;
    std::function<void(json::value payload)> on_message_handler;
    // OnMessage on_message_cb;

  public:
    binanceWS(net::any_io_executor ex, ssl::context& ctx, SPSCQueue<A>& q)
        : resolver_(ex)
        , ws_(ex, ctx)
        , diff_messages_queue(q) {}

    void run(char const* host, char const* port, json::value const& message, const std::string& streamName) {
        if (!SSL_set_tlsext_host_name(ws_.next_layer().native_handle(), host)) {
            throw boost::system::system_error(
                error_code(::ERR_get_error(), net::error::get_ssl_category()));
        }

        host_         = host;
        message_text_ = serialize(message);
        wsTarget_ += streamName;

        resolver_.async_resolve(host_, port, BINANCE_HANDLER(on_resolve));
    }

    void on_resolve(beast::error_code ec, tcp::resolver::results_type results) {
        if (ec)
            return fail_ws(ec, "resolve");

        if (!SSL_set_tlsext_host_name(ws_.next_layer().native_handle(), host_.c_str())) {
            throw beast::system_error{
                error_code(::ERR_get_error(), net::error::get_ssl_category())};
        }

        get_lowest_layer(ws_).expires_after(30s);

        beast::get_lowest_layer(ws_).async_connect(results, BINANCE_HANDLER(on_connect));
    }

    void on_connect(beast::error_code ec, [[maybe_unused]] tcp::endpoint ep) {
        if (ec)
            return fail_ws(ec, "connect");

        // Perform the SSL handshake
        ws_.next_layer().async_handshake(ssl::stream_base::client, BINANCE_HANDLER(on_ssl_handshake));
    }

    void on_ssl_handshake(beast::error_code ec) {
        if (ec)
            return fail_ws(ec, "ssl_handshake");

        beast::get_lowest_layer(ws_).expires_never();

        ws_.set_option(websocket::stream_base::timeout::suggested(beast::role_type::client));

        ws_.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
            req.set(http::field::user_agent,
                    std::string(BOOST_BEAST_VERSION_STRING) + " websocket-client-async");
        }));

        std::cout << "using host_: " << host_ << std::endl;
        ws_.async_handshake(host_, wsTarget_, BINANCE_HANDLER(on_handshake));
    }

    void on_handshake(beast::error_code ec) {
        if (ec) {
            return fail_ws(ec, "handshake");
        }

        std::cout << "Sending : " << message_text_ << std::endl;

        ws_.async_write(net::buffer(message_text_), BINANCE_HANDLER(on_write));
    }

    void on_write(beast::error_code ec, size_t bytes_transferred) {
        boost::ignore_unused(bytes_transferred);

        if (ec)
            return fail_ws(ec, "write");

        ws_.async_read(buffer_, BINANCE_HANDLER(on_message));
    }

    void handle_message() {
        auto payload = json::parse(beast::buffers_to_string(buffer_.cdata()));
        buffer_.clear();
        on_message_handler(std::move(payload));
    }

    void on_message(beast::error_code ec, size_t bytes_transferred) {
        boost::ignore_unused(bytes_transferred);
        if (ec)
            return fail_ws(ec, "read");

        handle_message();

        ws_.async_read(buffer_, BINANCE_HANDLER(on_message));
    }

    void subscribe_orderbook_diffs(const std::string action,const std::string symbol,short int depth_levels)
    {
        std::string stream = symbol + "@depth" + std::to_string(depth_levels);

        on_message_handler = [](json::value payload) {
            std::cout << "Orderbook Levels Update" << std::endl;
            std::cout << payload << std::endl;
        };
        
        json::value jv = {
            { "method", action },
            { "params", {stream} },
            { "id", 1 }
        };
        run(host, port, jv, stream);
    }
};

int main() {
    net::io_context ioc;
    ssl::context    ctx{ssl::context::tlsv12_client};

    ctx.set_verify_mode(ssl::verify_peer);
    ctx.set_default_verify_paths();

    SPSCQueue<int> queue;
    std::make_shared<binanceWS<int>>(make_strand(ioc), ctx, queue)
        ->subscribe_orderbook_diffs("SUBSCRIBE", "btcusdt", 10);
    ioc.run();
}

enter image description here

sehe
  • 374,641
  • 47
  • 450
  • 633