1

Problem Statement

I'm trying to build a websocket connection to the OKEX exchange using their websocket API. I am using Boost::Beast websockets.

The problem is that OKEX's servers don't follow the correct permessage_deflate compression protocol, and send messages that are incorrectly deflated. So I'm trying to inflate the messages myself. The problem is that it's not working... and what's driving me insane is that the behavior I'm getting is somewhat inconsistent.

Actual Code

My code is mostly copy and pasted from the previously linked to thread. For simplicity, I removed all the preprocessor macros and I hardcoded the socket values.

The inflate code is taken from Raj Advani's answer here.

Here is the main.cpp file:

#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <istream>
#include "zlib.h"
#include <iostream>

namespace net       = boost::asio;
namespace ssl       = net::ssl;
namespace beast     = boost::beast;
namespace http      = beast::http;
namespace websocket = beast::websocket;
using tcp = net::ip::tcp;
using stream_t = websocket::stream<ssl::stream<tcp::socket>>;

int inflate(const void *src, int srcLen, void *dst, int dstLen) {
    z_stream strm  = {0};
    strm.total_in  = strm.avail_in  = srcLen;
    strm.total_out = strm.avail_out = dstLen;
    strm.next_in   = (Bytef *) src;
    strm.next_out  = (Bytef *) dst;

    strm.zalloc = Z_NULL;
    strm.zfree  = Z_NULL;
    strm.opaque = Z_NULL;

    int err = -1;
    int ret = -1;

    err = inflateInit2(&strm, (15 + 32)); //15 window bits, and the +32 tells zlib to to detect if using gzip or zlib
    if (err == Z_OK) {
        err = inflate(&strm, Z_FINISH);
        if (err == Z_STREAM_END) {
            ret = strm.total_out;
        }
        else {
            inflateEnd(&strm);
            return err;
        }
    }
    else {
        inflateEnd(&strm);
        return err;
    }

    inflateEnd(&strm);
    return ret;
}


int main(int argc, char** argv) {

    std::string host = "real.okex.com";
    auto const port  = "8443";
    auto const path  = "/ws/v3";

    net::io_context ioc;
    ssl::context ctx{ ssl::context::sslv23 };
    tcp::resolver resolver{ ioc };
    stream_t s{ ioc, ctx };
    ctx.set_verify_mode(ssl::verify_none);
    tcp::resolver::results_type results = resolver.resolve(host, port);
    net::connect(
            beast::get_lowest_layer(s),
            //s.next_layer().next_layer(),
            results.begin());

    // SSL handshake
    s.next_layer().handshake(ssl::stream_base::client);
    s.handshake(host + ":" + port, path);

    std::cout << "connected." << std::endl;

    // send request to the websocket
    s.write(net::buffer("{'op':'subscribe', 'args':['spot/ticker:ETH-USDT']}"));

    {
        net::streambuf buffer;
        s.read(buffer);

//        auto data_it = buffer.data().begin();
//        std::cout<<"Iterating over data of size:" << buffer.data().size()<<endl; // LINE 85
//        int i = 0;
//        while (data_it != buffer.data().end()) {
//            std::cout << "buffer data["<<i++<<"] size:" << (data_it->size())<<endl;
//            data_it++;
//        }

        net::streambuf out_buffer;
        const int error_code_out = inflate(&buffer, buffer.size(), &out_buffer, 10000000);

        std::cout << "received. size:"<<buffer.size()<<" data: "<< &buffer << std::endl;
        std::cout << "deflated. error?"<< error_code_out << " data: " << &out_buffer << std::endl;
    }
}

Code Output+Question

The inflation says that the buffer's size is 117. I thought that was reasonable, but for some reason, I get Z_DATA_ERROR when decompressing, leading me to believe there is more data to be parsed....

So I looked up net::streambuf's documentation and I found that there are apparently multiple buffers that can be read from, so maybe I was using just one buffer? I ran the commented out code (excluding the LINE 85 line in the middle) and it never went through the loop... which I thought was odd. I put in that line, and then all of a sudden I have a couple hundred buffers? The (truncated) output is something like:

connected.
Iterating over data of size:117
buffer data[0] size:117
buffer data[1] size:72198326954657960
buffer data[2] size:140735485986592
buffer data[3] size:140618848326656
buffer data[4] size:140618848326656
.. many more lines of this...
buffer data[121] size:7089075335985461349
buffer data[122] size:3472329396561475632
buffer data[123] size:8747116609081390898
buffer data[124] size:3472329396561475632
buffer data[125] size:3472387902693336678
buffer data[126] size:
Process finished with exit code 139 (interrupted by signal 11: SIGSEGV)

as you can see it crashes. I have no clue what is going on. I have no clue how to decode the streambuf at this point... and the documentation seems to assume a lot of background knowledge i don't have. I tried using buffer.data(), converting the buffer to a char* array, all of which lead me to exactly the same behavior...

Not sure what to do. Any help welcome


For reference: Python Implementation

import websockets
import asyncio
import zlib

def inflate(data):
    decompress = zlib.decompressobj(-zlib.MAX_WBITS)
    inflated = decompress.decompress(data)
    inflated += decompress.flush()
    return inflated

async def main():
    client = await websockets.connect("wss://real.okex.com:8443/ws/v3")
    await client.send("{'op':'subscribe', 'args':['spot/ticker:ETH-USDT']}")
    r = await client.recv()
    print(len(r), r)
    print(inflate(r))


if __name__ == '__main__':
    asyncio.run(main())
JoeVictor
  • 1,806
  • 1
  • 17
  • 38

2 Answers2

1

PS.:

At the very end I remembered seeing the particular type of gibberish returned by OKEX before, and indeed I looked at this server before.

The question is conceptually a duplicate of Boost inflate algorithm decompress but your particular code had (much) bigger problems, which merit an analysis:

I was very confused. How does this compile?

const int error_code_out = inflate(&buffer, buffer.size(), &out_buffer, 10000000);

Buffer is not a char array or similar. It's not even a POD type. It's a streambuf. Use it as such.

You complain that "it is somewhat inconsistent"? That makes total sense because that's what you asked for: invoking Undefined Behaviour is a good recipe to get "inconsistency". Or nasal daemons.

Now, next up is: why doesn't the compiler warn?

One culprit is that ::inflate overloads the ZLIB function of the same name. Let's untangle by putting ours in a namespace.

Next up, it takes void* arguments. UHOH. And they're reinterpet_cast<>-ed into Byte*. There is just so much to hate about this code. It uses C-style casts recklessly, casting away const and totally disregarding that the argument isn't even POD.

Let's make it MILDLY safe:

namespace mylib {
    int inflate(uint8_t const *src, int srcLen, uint8_t *dst, int dstLen) {
        z_stream strm  {};
        strm.total_in  = strm.avail_in  = srcLen;
        strm.total_out = strm.avail_out = dstLen;
        strm.next_in = const_cast<Bytef*>(static_cast<Bytef const*>(src));
        strm.next_out = static_cast<Bytef*>(dst);

Let's not gloss over the fact that you even explicitly, willfully promised the function that output buffer is of the exact size 10000000. You should have asked yourself how you know that when you wrote it.

Now the code expresses intent and we can expect the compiler to diagnose our bug. Which it does, of course, because that's what C++ compilers do.

Fixing the invocation

Let's side-step the confusion with net::streambuf. You can use a string or vector as a dynamic buffer just the same. That might not always be as efficient, but let's focus on understandable code here:

std::vector<uint8_t> in, out;
auto in_buffer = net::dynamic_buffer(in);
s.read(in_buffer);

out.resize(1024); // make sure it's enough
const int err = mylib::inflate(
        in.data(), in.size(),
        out.data(), out.size());

See, now you know what you're passing. And that it'll be fine.

Let's avoid printing the input data (it's binary gibberish...).

std::cout << "received. size:" << in_buffer.size() << std::endl;
std::cout << "deflated. error?" << err << " data: "
          << std::string(out.begin(), out.end())
          << std::endl;

Now at this point, the inflation still fails, but that's a problem with the server, see Boost inflate algorithm decompress - which also shows some alternatives for buffer handling.

Fixed Code

#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <istream>
#include "zlib.h"
#include <iostream>

namespace net       = boost::asio;
namespace ssl       = net::ssl;
namespace beast     = boost::beast;
namespace http      = beast::http;
namespace websocket = beast::websocket;
using tcp = net::ip::tcp;
using stream_t = websocket::stream<ssl::stream<tcp::socket>>;

namespace mylib {
    int inflate(uint8_t const *src, int srcLen, uint8_t *dst, int dstLen) {
        z_stream strm  {};
        strm.total_in  = strm.avail_in  = srcLen;
        strm.total_out = strm.avail_out = dstLen;
        strm.next_in = const_cast<Bytef*>(static_cast<Bytef const*>(src));
        strm.next_out = static_cast<Bytef*>(dst);

        strm.zalloc = Z_NULL;
        strm.zfree  = Z_NULL;
        strm.opaque = Z_NULL;

        int err = -1;
        int ret = -1;

        err = inflateInit2(&strm, (15 + 32)); //15 window bits, and the +32 tells zlib to to detect if using gzip or zlib
        if (err == Z_OK) {
            err = inflate(&strm, Z_FINISH);
            if (err == Z_STREAM_END) {
                ret = strm.total_out;
            }
            else {
                inflateEnd(&strm);
                return err;
            }
        }
        else {
            inflateEnd(&strm);
            return err;
        }

        inflateEnd(&strm);
        return ret;
    }
}

int main() {
    std::string host = "real.okex.com";
    auto const port  = "8443";
    auto const path  = "/ws/v3";

    net::io_context ioc;
    tcp::resolver resolver{ ioc };

    ssl::context ctx { ssl::context::sslv23 };
    ctx.set_verify_mode(ssl::verify_none);

    stream_t s{ ioc, ctx };
    net::connect(beast::get_lowest_layer(s), resolver.resolve(host, port));

    // SSL handshake
    s.next_layer().handshake(ssl::stream_base::client);

    {
        websocket::permessage_deflate opt;
        opt.client_enable = true; // for clients
        opt.server_enable = true; // for servers
        s.set_option(opt);
    }

    s.handshake(host + ":" + port, path);

    std::cerr << "connected." << std::endl;

    // send request to the websocket
    s.write(net::buffer("{'op':'subscribe', 'args':['spot/ticker:ETH-USDT']}"));

    {
        std::vector<uint8_t> in, out;
        auto in_buffer = net::dynamic_buffer(in);
        s.read(in_buffer);

        // std::cout.write(reinterpret_cast<char const*>(in.data()), in.size());

        out.resize(1024); // make sure it's enough
        const int err = mylib::inflate(
                in.data(), in.size(),
                out.data(), out.size());

        std::cerr << "received. size:" << in_buffer.size() << std::endl;
        //std::cerr << "received. data:" << std::string(in.begin(), in.end()) << std::endl;
        std::cerr << "deflated. error?" << err << " data: "
                  << std::string(out.begin(), out.end())
                  << std::endl;
    }
}
sehe
  • 374,641
  • 47
  • 450
  • 633
  • First of all, thanks for your patience with this code, I'm not new to C++ but definitely new to this sort of work with it. I wrote the max size to be 10000, i never really knew I had to resize the buffer accordingly... it makes sense why it broke then – JoeVictor Feb 19 '21 at 22:28
  • I guess I need to understand more about what a `dynamic_buffer` does too... – JoeVictor Feb 19 '21 at 22:29
  • 1
    Your problem wasn't that you didn't resize the buffer. You were passing a void* to a C++ class as if it was pointing to a buffer of 100000 bytes :) Regardless, I finally solved it. See [my new answer](https://stackoverflow.com/a/66285851/85371) – sehe Feb 19 '21 at 22:32
1

I have an older answer up that kind of hashes out the problems with the code. I have since found the solution to the remaining issue: the server responses.

Solution

Not only does OKEX not adhere to WS standards to enable per-message deflation, but also it abruptly ends the data. However, it turns out that it is actually fine if you keep the partially inflated results.

The way I made it work was by not using ZLIB directly but instead using beast::zlib::inflate_stream. This has a more flexible interface, which allows us to get the results we need:

namespace mylib {
    auto inflate(std::vector<uint8_t> const& in, std::vector<uint8_t>& out) {
        boost::system::error_code ec;
        beast::zlib::z_params zp{};
        zp.next_in   = (Bytef*)in.data();
        zp.avail_in  = in.size();
        zp.next_out  = out.data();
        zp.avail_out = out.size();

        beast::zlib::inflate_stream zs;
        zs.write(zp, beast::zlib::Flush::full, ec);

        return ec;
    }
}

Now we use it like:

std::vector<uint8_t> in, out;
auto in_buffer = net::dynamic_buffer(in);
s.read(in_buffer);

out.resize(1024); // make sure it's enough
auto ec = mylib::inflate(in, out);

std::cout << "deflated. " << ec.message() << std::endl;
std::cout << std::string(out.begin(), out.end()) << std::endl;

And it prints

connected.
deflated. unexpected end of deflate stream
{"event":"error","message":"Unrecognized request: {'op':'subscribe', 'args':['spot/ticker:ETH-USDT']}\u0000","errorCode":30039}

So, despite the unexpected end of deflate stream, the data is valid and complete JSON.

PS.2: see newer answer that manages to get over the last hurdle

Full Listing

Live On Compiler Exporer

#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <istream>
#include "zlib.h"
#include <iostream>

namespace net       = boost::asio;
namespace ssl       = net::ssl;
namespace beast     = boost::beast;
namespace http      = beast::http;
namespace websocket = beast::websocket;
using tcp = net::ip::tcp;
using stream_t = websocket::stream<ssl::stream<tcp::socket>>;

namespace mylib {
    auto inflate(std::vector<uint8_t> const& in, std::vector<uint8_t>& out) {
        boost::system::error_code ec;
        beast::zlib::z_params zp{};
        zp.next_in   = (Bytef*)in.data();
        zp.avail_in  = in.size();
        zp.next_out  = out.data();
        zp.avail_out = out.size();

        beast::zlib::inflate_stream zs;
        zs.write(zp, beast::zlib::Flush::full, ec);

        return ec;
    }
}

int main() {
    std::string host = "real.okex.com";
    auto const port  = "8443";
    auto const path  = "/ws/v3";

    net::io_context ioc;
    tcp::resolver resolver{ ioc };

    ssl::context ctx { ssl::context::sslv23 };
    ctx.set_verify_mode(ssl::verify_none);

    stream_t s{ ioc, ctx };
    net::connect(beast::get_lowest_layer(s), resolver.resolve(host, port));

    // SSL handshake
    s.next_layer().handshake(ssl::stream_base::client);

    {
        websocket::permessage_deflate opt;
        opt.client_enable = true; // for clients
        opt.server_enable = true; // for servers
        s.set_option(opt);
    }

    s.handshake(host + ":" + port, path);

    std::cout << "connected." << std::endl;

    // send request to the websocket
    s.write(net::buffer("{'op':'subscribe', 'args':['spot/ticker:ETH-USDT']}"));

    {
        std::vector<uint8_t> in, out;
        auto in_buffer = net::dynamic_buffer(in);
        s.read(in_buffer);

        out.resize(1024); // make sure it's enough
        auto ec = mylib::inflate(in, out);

        std::cout << "deflated. " << ec.message() << std::endl;
        std::cout << std::string(out.begin(), out.end()) << std::endl;
    }
}
sehe
  • 374,641
  • 47
  • 450
  • 633
  • Can you explain what unexpected end of stream means? If you look at my Python implementation (see update in Question), it totally explains how you're right. The same response is actually 113 bytes in Python – JoeVictor Feb 19 '21 at 22:34
  • 1
    Mostly trying to use this opportunity to learn C++ better. This is the main goal for the entire project :) – JoeVictor Feb 19 '21 at 22:35
  • 1
    It means that the algorithm is expecting something else to continue/finish the data. It's nice indeed to compare with the Python take on it. Apparently, Python just silently ignores the trailing bytes. – sehe Feb 19 '21 at 22:36
  • but wouldn't the `size()` of the buffer be based on something like a null terminating character, and would therefore know that it is in fact 113 bytes not 117? – JoeVictor Feb 19 '21 at 22:37
  • 1
    Certainly not a NUL character. There's a framing protocol that tells both sides about the size of the messages to expect: https://tools.ietf.org/html/rfc6455#section-5.1 (Data Framing/Overview). My hunch is that Python uses it to read off the wire, but then ignores the length when deflating. – sehe Feb 19 '21 at 23:08
  • 1
    makes more sense. I’ve worked with similar formats but for images. Thanks for all your help with this! – JoeVictor Feb 19 '21 at 23:43