Problem Statement
I'm trying to build a websocket connection to the OKEX exchange using their websocket API. I am using Boost::Beast
websockets.
The problem is that OKEX's servers don't follow the correct permessage_deflate
compression protocol, and send messages that are incorrectly deflated. So I'm trying to inflate the messages myself. The problem is that it's not working... and what's driving me insane is that the behavior I'm getting is somewhat inconsistent.
Actual Code
My code is mostly copy and pasted from the previously linked to thread. For simplicity, I removed all the preprocessor macros and I hardcoded the socket values.
The inflate
code is taken from Raj Advani's answer here.
Here is the main.cpp
file:
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <istream>
#include "zlib.h"
#include <iostream>
namespace net = boost::asio;
namespace ssl = net::ssl;
namespace beast = boost::beast;
namespace http = beast::http;
namespace websocket = beast::websocket;
using tcp = net::ip::tcp;
using stream_t = websocket::stream<ssl::stream<tcp::socket>>;
int inflate(const void *src, int srcLen, void *dst, int dstLen) {
z_stream strm = {0};
strm.total_in = strm.avail_in = srcLen;
strm.total_out = strm.avail_out = dstLen;
strm.next_in = (Bytef *) src;
strm.next_out = (Bytef *) dst;
strm.zalloc = Z_NULL;
strm.zfree = Z_NULL;
strm.opaque = Z_NULL;
int err = -1;
int ret = -1;
err = inflateInit2(&strm, (15 + 32)); //15 window bits, and the +32 tells zlib to to detect if using gzip or zlib
if (err == Z_OK) {
err = inflate(&strm, Z_FINISH);
if (err == Z_STREAM_END) {
ret = strm.total_out;
}
else {
inflateEnd(&strm);
return err;
}
}
else {
inflateEnd(&strm);
return err;
}
inflateEnd(&strm);
return ret;
}
int main(int argc, char** argv) {
std::string host = "real.okex.com";
auto const port = "8443";
auto const path = "/ws/v3";
net::io_context ioc;
ssl::context ctx{ ssl::context::sslv23 };
tcp::resolver resolver{ ioc };
stream_t s{ ioc, ctx };
ctx.set_verify_mode(ssl::verify_none);
tcp::resolver::results_type results = resolver.resolve(host, port);
net::connect(
beast::get_lowest_layer(s),
//s.next_layer().next_layer(),
results.begin());
// SSL handshake
s.next_layer().handshake(ssl::stream_base::client);
s.handshake(host + ":" + port, path);
std::cout << "connected." << std::endl;
// send request to the websocket
s.write(net::buffer("{'op':'subscribe', 'args':['spot/ticker:ETH-USDT']}"));
{
net::streambuf buffer;
s.read(buffer);
// auto data_it = buffer.data().begin();
// std::cout<<"Iterating over data of size:" << buffer.data().size()<<endl; // LINE 85
// int i = 0;
// while (data_it != buffer.data().end()) {
// std::cout << "buffer data["<<i++<<"] size:" << (data_it->size())<<endl;
// data_it++;
// }
net::streambuf out_buffer;
const int error_code_out = inflate(&buffer, buffer.size(), &out_buffer, 10000000);
std::cout << "received. size:"<<buffer.size()<<" data: "<< &buffer << std::endl;
std::cout << "deflated. error?"<< error_code_out << " data: " << &out_buffer << std::endl;
}
}
Code Output+Question
The inflation says that the buffer
's size is 117. I thought that was reasonable, but for some reason, I get Z_DATA_ERROR
when decompressing, leading me to believe there is more data to be parsed....
So I looked up net::streambuf
's documentation and I found that there are apparently multiple buffers that can be read from, so maybe I was using just one buffer? I ran the commented out code (excluding the LINE 85
line in the middle) and it never went through the loop... which I thought was odd. I put in that line, and then all of a sudden I have a couple hundred buffers? The (truncated) output is something like:
connected.
Iterating over data of size:117
buffer data[0] size:117
buffer data[1] size:72198326954657960
buffer data[2] size:140735485986592
buffer data[3] size:140618848326656
buffer data[4] size:140618848326656
.. many more lines of this...
buffer data[121] size:7089075335985461349
buffer data[122] size:3472329396561475632
buffer data[123] size:8747116609081390898
buffer data[124] size:3472329396561475632
buffer data[125] size:3472387902693336678
buffer data[126] size:
Process finished with exit code 139 (interrupted by signal 11: SIGSEGV)
as you can see it crashes. I have no clue what is going on. I have no clue how to decode the streambuf
at this point... and the documentation seems to assume a lot of background knowledge i don't have. I tried using buffer.data()
, converting the buffer
to a char*
array, all of which lead me to exactly the same behavior...
Not sure what to do. Any help welcome
For reference: Python Implementation
import websockets
import asyncio
import zlib
def inflate(data):
decompress = zlib.decompressobj(-zlib.MAX_WBITS)
inflated = decompress.decompress(data)
inflated += decompress.flush()
return inflated
async def main():
client = await websockets.connect("wss://real.okex.com:8443/ws/v3")
await client.send("{'op':'subscribe', 'args':['spot/ticker:ETH-USDT']}")
r = await client.recv()
print(len(r), r)
print(inflate(r))
if __name__ == '__main__':
asyncio.run(main())