0

I am trying to build an Orderbook representation for the Poloniex Bitcoin exchange. I am subscribing to the Push-API which sends updates of the Orderbook over Websocket. The problem is that my Orderbook gets inconsistent over time, i.e. orders which should have been removed are still in my book.

The Orderbook on the following picture has this format:

Exchange-Name - ASK - Amount - Price | Price - Amount - BID - Exchange-Name

enter image description here

On the left side (ASK) are people who are selling a currency. On the right side (BID) are people who are buying a currency. BTCUSD, ETHBTC and ETHUSD describe the different markets. BTCUSD means Bitcoin is exchanged for US-Dollar, ETHBTC means Ethereum is exchanged for Bitcoin and ETHUSD means Ethereum is exchanged for US-Dollar.

Poloniex sends updates over Websocket in JSON-Format. Here is an example of such an update:

[
   36,
   7597659581972377,
   8089731807973507,
   {},
   [
      {"data":{"rate":"609.00000029","type":"bid"},"type":"orderBookRemove"},{"data":{"amount":"0.09514285","rate":"609.00000031","type":"bid"},"type":"orderBookModify"}
   ],
   {
      "seq":19976127
   }
]
  • json[0] can be ignored for this question.
  • json[1] is the market identifier. That means I send a request like "Subscribe to market BTCUSD" and they answer "BTCUSD updates will be sent under identifier number 7597659581972377".

  • json[2] can be ignored for this question.

  • json[3] can be ignored for this question.
  • json[4] contains the actual update data. More about that later.
  • json[5] contains a sequence number. It is used to execute the updates correctly if they arrive out of order. So if I receive 5 updates within 1 second by the order 1 - 3 - 5 - 4 - 2 they have to be executed like 1 - 2 - 3 - 4 - 5. Each market gets a different "sequence-number-sequence".

As I said, json[4] contains an array of updates. There are three different kinds in json[4][array-index]["type"]:

  1. orderBookModify: The available amount for a specific price has changed.
  2. orderBookRemove: The order is not available anymore and must be removed.
  3. newTrade: Can be used to build a trade history. Not required for what I am trying to do so it can be ignored.

json[4][array-index]["data"] contains two values if it is a orderBookRemove and three values if it is a orderBookModify.

  • rate: The price.
  • amount (only existant if it is a orderBookModify): The new amount.
  • type: ask or bid.

There is also one kind of special message:

[36,8932491360003688,1315671639915103,{},[],{"seq":98045310}]

It only contains a sequence number. It is kind of a heartbeat message and does not send any updates.

The Code

I use three containers:

std::map<std::uint64_t,CMarket> m_mMarkets;
std::map<CMarket, long> m_mCurrentSeq;
std::map<CMarket, std::map<long, web::json::value>> m_mStack;

m_mMarkets is used to map the market-identifier number to the Market as it is stored inside my program.

m_mCurrentSeq is used to store the current sequence number for each market.

m_mStack stores the updates by market and sequence-number (that's what the long is for) until they can be executed.

This is the part which receives the updates:

// ....

// This method can be called asynchronously, so lock the containers.
this->m_muMutex.lock();

// Map the market-identifier to a CMarket object.
CMarket market = this->m_mMarkets.at(json[1].as_number().to_uint64());

// Check if it is a known market. This should never happen!
if(this->m_mMarkets.find(json[1].as_number().to_uint64()) == this->m_mMarkets.end())
{
    this->m_muMutex.unlock();
    throw std::runtime_error("Received Market update of unknown Market");
}

// Add the update to the execution-queue
this->m_mStack[market][(long)json[5]["seq"].as_integer()] = json;

// Execute the execution-queue
this->executeStack();

this->m_muMutex.unlock();

// ....

Now comes the execution-queue. I think this is where my mistake is located.

Function: "executeStack":

for(auto& market : this->m_mMarkets) // For all markets
{
    if(this->m_mCurrentSeq.find(market.second) != this->m_mCurrentSeq.end()) // if market has a sequence number
    {
        long seqNum = this->m_mCurrentSeq.at(market.second);

        // erase old entries
        for(std::map<long, web::json::value>::iterator it = this->m_mStack.at(market.second).begin(); it != this->m_mStack.at(market.second).end(); )
        {
            if((*it).first < seqNum)
            it = this->m_mStack.at(market.second).erase(it);
            else
            ++it;
        }

        // This container is used to store the updates to the Orderbook temporarily.
        std::vector<Order> addOrderStack{};

        while(this->m_mStack.at(market.second).find(seqNum) != this->m_mStack.at(market.second).end())// has entry for seqNum
        {
            web::json::value json = this->m_mStack.at(market.second).at(seqNum);

            for(auto& v : json[4].as_array())
            {
                if(v["type"].as_string().compare("orderBookModify") == 0)
                {
                    Order::Type t = v["data"]["type"].as_string().compare("ask") == 0 ? Order::Type::Ask : Order::Type::Bid;
                    Order newOrder(std::stod(v["data"]["rate"].as_string()), std::stod(v["data"]["amount"].as_string()), t, market.second, this->m_pclParent, v.serialize());

                    addOrderStack.push_back(newOrder);

                } else if(v["type"].as_string().compare("orderBookRemove") == 0)
                {
                    Order::Type t = v["data"]["type"].as_string().compare("ask") == 0 ? Order::Type::Ask : Order::Type::Bid;
                    Order newOrder(std::stod(v["data"]["rate"].as_string()), 0, t, market.second, this->m_pclParent, v.serialize());

                    addOrderStack.push_back(newOrder);

                } else if(v["type"].as_string().compare("newTrade") == 0)
                {
                    //
                } else
                {
                    throw std::runtime_error("Unknown message format");
                }
            }

            this->m_mStack.at(market.second).erase(seqNum);
            seqNum++;
        }

        // The actual OrderList gets modified here. The mistake CANNOT be inside OrderList::addOrderStack, because I am running Orderbooks for other exchanges too and they use the same method to modify the Orderbook, and they do not get inconsistent.

        if(addOrderStack.size() > 0)
        OrderList::addOrderStack(addOrderStack);

        this->m_mCurrentSeq.at(market.second) = seqNum;

    }
}

So if this runs for a longer period, the Orderbook becomes inconsistent. That means Orders which should have been removed are still available and there are wrong entrys inside the book. I am not quite sure why this is happening. Maybe I did something wrong with the sequence-numbers, because it seems that the Update-Stack does not always get executed correctly. I have tried everything that came to my mind but I could not get it to work and now I am out of ideas what could be wrong. If you have any questions please feel free to ask.

Bobface
  • 2,782
  • 4
  • 24
  • 61

1 Answers1

2

tl;dr: Poloniex API is imperfect and drops messages. Some simply never arrive. I've found that this happens for all users subscribed regardless of location in the world.

Hope that answer regarding utilization of Autobahn|cpp to connect to Poloniex' Websocket API (here) was useful. I suspect you had already figured it out though (otherwise this question/problem couldn't exist for you). As you might have gathered, I too have a Crypto Currency Bot written in C++. I've been working on it off and on now for about 3.5 years.

The problem set you're facing is something I had to overcome as well. In this case, I'd prefer not to provide my source code as the speed at which you process this can have huge effects on your profit margins. However, I will give sudo code that offers some very rough insight into how I'm handling Web Socket events processing for Poloniex.

//Sudo Code
void someClass::handle_poloniex_ws_event(ws_event event){
    if(event.seq_num == expected_seq_num){
       process_ws_event(event)
       update_expected_seq_num
    }
    else{
        if(in_cache(expected_seq_num){
            process_ws_event(from_cache(expected_seq_num))
            update_expected_seq_num
        }
        else{
            cache_event(event)
        }  
    }
} 

Note that what I've written above is a super simplified version of what I'm actually doing. My actual solution is about 500+ lines long with "goto xxx" and "goto yyy" throughout. I recommend taking timestamps/cpu clock cycle counts and comparing to current time/cycle counts to help you make decisions at any given moment (such as, should I wait for the missing event, should I continue processing and note to the rest of the program that there may be inaccuracies, should I utilize a GET request to refill my table, etc.?). The name of the game here is speed, as I'm sure you know. Good luck! Hope to hear from ya. :-)

Community
  • 1
  • 1
Xandrix
  • 494
  • 5
  • 15
  • Thanks for your answer! I have also figured out why this is not working. It is indeed a fault on Poloniex API which seems to drop some updates from time to time. I might contact you sometime :) – Bobface Mar 21 '17 at 21:32
  • Thanks, I wrote it down. – Bobface Mar 21 '17 at 22:35