2

I've built a windows service that subscribes to around 10,000 stock tickers in real-time using ClientWebSocket. If I subscribe to 1,000 tickers I receive all the data points as I should (receiving few hundred messages a second), as soon as I get up to 2,000 tickers I don't seem to be receiving the data I should be, 10,000 (receiving thousands of messages a second) its even worse. I've run comparison reports and it looks like I'm losing up to 60% of the packets. I've talked to polygon (the provider of the real-time data) about this issue and they claim their Socket is a firehose and everything that should go out, goes out, and that none of their other clients are complaining. So the only logical thing here would to be to assume its my code, or some limitation. Maybe it's the Task portion of the Receive method? Maybe window's has a max task limitation and I'm exceeding it.

I've also tested this on a high powered dedicated server with 10gb connection so it doesnt seem to be a connection or hardware limitation.

I've also by passed my BlockingCollection cache and the problem still persisted.

Hopefully one of you has some insight, thank you!

Here's my code:

        public static ConcurrentDictionary<string, TradeObj> TradeFeed = new ConcurrentDictionary<string, TradeObj>();
        public static ConcurrentDictionary<string, QuoteObj> QuoteFeed = new ConcurrentDictionary<string, QuoteObj>();
        public static ConcurrentDictionary<string, AggObj> AggFeed = new ConcurrentDictionary<string, AggObj>();
 public static BlockingCollection<byte[]> packets = new BlockingCollection<byte[]>();

        private static void Start(string[] args)
        {
            try
            {
                Polygon.StartSub();

                int HowManyConsumers = 2;

                  for (int i = 0; i < HowManyConsumers; i++)
                  {
                      Task.Factory.StartNew(Polygon.ConsumePackets);
                  }

            } catch(Exception e)
            {
                Console.WriteLine(e.Message);
            }

            Console.ReadKey();
        }

        public static async Task StartSub()
        {
            do
            {
                using (var socket = new ClientWebSocket())
                    try
                    {
                       // socket.Options.KeepAliveInterval = TimeSpan.Zero;
                        var Connection = "wss://socket.polygon.io/stocks";

                        await socket.ConnectAsync(new Uri(Connection), CancellationToken.None);

                        Console.WriteLine("Websocket opened to Polygon.");
                        await Send(socket, "{\"action\":\"auth\",\"params\":\""+ConfigurationManager.AppSettings["PolygonAPIToken"]+"\"}");
                        List<List<string>> batches = new List<List<string>>();

                        for (int i = 0; i < FeedCache.Tickers.Count(); i += 500)
                        {
                            var tempList = new List<string>();
                            tempList.AddRange(FeedCache.Tickers.Skip(i).Take(500));
                            batches.Add(tempList);
                        }

                        int bNum = 0;
                        string[] quoteStrings = new string[batches.Count()];

                        foreach (var tList in batches)
                        {
                            var tQuery = "";

                            tQuery = tQuery + "T." + string.Join(",T.", tList.ToArray());
                            tQuery = tQuery + ",A." + string.Join(",A.", tList.ToArray());
                            tQuery = tQuery + ",Q." + string.Join(",Q.", tList.ToArray());
                            quoteStrings[bNum] = tQuery;
                            bNum++;
                        }

                        for (int i = 0; i < quoteStrings.Count(); i++)
                        {
                            string SubscribeString = "{\"action\":\"subscribe\",\"params\":\"" + quoteStrings[i] + "\"}";
                            await Send(socket, SubscribeString);
                        }


                        await Receive(socket);

                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine($"ERROR - {ex.Message}");
                        Console.WriteLine(ex.ToString());
                    }
            } while (true);
        }

        static async Task Send(ClientWebSocket socket, string data)
        {
            var segment = new ArraySegment<byte>(Encoding.UTF8.GetBytes(data));
            await socket.SendAsync(segment, WebSocketMessageType.Text, true, CancellationToken.None);
        }


        static async Task Receive(ClientWebSocket socket)
        {

            do {
                WebSocketReceiveResult result;
            var buffer = new ArraySegment<byte>(new byte[2000]);
                using (var ms = new MemoryStream())
                {
                    do
                    {
                        result = await socket.ReceiveAsync(buffer, CancellationToken.None);
                        ms.Write(buffer.Array, buffer.Offset, result.Count);
                    } while (!result.EndOfMessage);


                    if (result.MessageType == WebSocketMessageType.Close)
                    {
                        await socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closed in server by the client", CancellationToken.None);
                        Console.WriteLine("Socket disconnecting, trying to reconnect.");
                        await StartSub();
                    }
                    else
                    {
                     packets.Add(ms.ToArray());
                    }
                }
            } while (true);
        }

        public static async void ConsumePackets()
        {

            foreach (var buffer in packets.GetConsumingEnumerable())
            {

                using (var ms = new MemoryStream(buffer))
                {
                    ms.Seek(0, SeekOrigin.Begin);
                    using (var reader = new StreamReader(ms, Encoding.UTF8))
                    {

                        var data = await reader.ReadToEndAsync();
                        try
                        {
                            var j = JArray.Parse(data);


                        if (j != null)
                        {
                            string id = (string)j[0]["ev"];
                            switch (id)
                            {
                                case "T":
                                    AddOrUpdateTrade((string)j[0]["sym"], j);
                                    break;
                                case "Q":
                                    AddOrUpdateQuote((string)j[0]["sym"], j);
                                    break;
                                case "A":
                                    AddOrUpdateAgg((string)j[0]["sym"], j);
                                    break;
                            }
                        }
                        }
                        catch (Exception e)
                        {
                            Console.WriteLine(e.ToString());
                        }
                    }

                }

            }
        }

        public static void AddOrUpdateTrade(string ticker, JArray data)
        {

            TradeFeed.AddOrUpdate(ticker, new TradeObj {
                LastPrice = (double)data[0]["p"],
                TradeCount = 1
            }, (key, existingVal) =>
            {
                return new TradeObj {
                    LastPrice = (double)data[0]["p"],
                    TradeCount = existingVal.TradeCount + 1,
                    PriceDirection = (double)data[0]["p"] < existingVal.LastPrice ? "D" : "U"
                };
            });

        }

        public static void AddOrUpdateAgg(string ticker, JArray data)
        {

            AggFeed.AddOrUpdate(ticker, new AggObj
            {
                TickVolume = (long)data[0]["v"],
                VolumeShare = (long)data[0]["av"],
                OpenPrice = (double)data[0]["op"],
                TickAverage = (double)data[0]["a"],
                VWAP = (double)data[0]["vw"],
                TickClosePrice = (double)data[0]["c"],
                TickHighPrice = (double)data[0]["h"],
                TickLowPrice = (double)data[0]["l"],
                TickOpenPrice = (double)data[0]["o"]
            }, (key, existingVal) =>
            {
                return new AggObj
                {
                    TickVolume = (long)data[0]["v"],
                    VolumeShare = (long)data[0]["av"],
                    OpenPrice = (double)data[0]["op"],
                    TickAverage = (double)data[0]["a"],
                    VWAP = (double)data[0]["vw"],
                    TickClosePrice = (double)data[0]["c"],
                    TickHighPrice = (double)data[0]["h"],
                    TickLowPrice = (double)data[0]["l"],
                    TickOpenPrice = (double)data[0]["o"]
                };
            });

        }

        public static void AddOrUpdateQuote(string ticker, JArray data)
        {

            QuoteFeed.AddOrUpdate(ticker, new QuoteObj
            {
                BidPrice = (double)data[0]["bp"],
                BidSize = (double)data[0]["bs"],
                AskPrice = (double)data[0]["ap"],
                AskSize = (double)data[0]["as"]
            }, (key, existingVal) =>
            {
                return new QuoteObj
                {
                    BidPrice = (double)data[0]["bp"],
                    BidSize = (double)data[0]["bs"],
                    AskPrice = (double)data[0]["ap"],
                    AskSize = (double)data[0]["as"]
                };
            });

        }

Nick D
  • 81
  • 1
  • 5
  • 1
    I've written very similar market data handlers and I'd like to help but there's a lot going on here. Any chance you could code up something that reproduces the issue? You might also get an "aha" moment by trying to repro without the data consumption part, if possible. That said, I don't like the usage of `BlockingCollection` inside a `Task`, because it blocks. – Zer0 Mar 07 '20 at 17:14
  • @zer0 Thanks for taking time out of the weekend to reply to me. Even when by passing the BlockingCollection cache it still does not receive the amount of packets it should. I created a counter to record how many times receive is hit. So basically the problem is happening right on the receive portion, not further down the line, which is why I'm thinking ClientWebSocket can't handle this amount of incoming data or Task is being limited. – Nick D Mar 07 '20 at 18:37

0 Answers0