0

This question is kind of an extension to this one, I have updated my code below to reflect a task that runs continuously updating web clients using through a Websocket connection, the tasks checks for stock prices for a provided company (symbol) received in the Recv() method, when the user changes the company the Recv() method should cancel the running task (for the older company) and start a new one for the newly provided company/symbol.

The problem I'm having is that the line where await Task.Run(() => StockPricingAsync(jsonResult), priceToken.Token); in the Recv method gets executed doesn't cancel the existing task although there is a priceToken.Cancel just right before. Any idea how to get the running task cancelled and start the same one immediately?

I have the priceToken variable declared globally so it can be accessed from within the running task, but this does not work.

I also tried to assign the task to variable and make that variable = null but that did not cancel the task as well.

I'm also open to change the approach completely if I have to, all I need is a running task that provides pricing information for a passed parameter which gets changed by the client while the connection is still open.

  CancellationTokenSource priceToken = new CancellationTokenSource();
 public async Task GetDataAsync(WebSocket ws)
        {
            CancellationToken token = new();

            try
            {
                var sendTask = Task.Run(() => Send(ws, token));
              
                var recvTask = Task.Run(() => Recv(ws, token));
                do
                {
                    await Task.Delay(1000);
                } while (ws.State == WebSocketState.Open);
            }
            finally
            {
                 await ws.CloseAsync(WebSocketCloseStatus.Empty, "", token);
            }
        }

        async Task Recv(WebSocket ws, CancellationToken token)
        {

            Console.WriteLine("Recv task started...");
            var buffer = WebSocket.CreateClientBuffer(1024, 1024);
            WebSocketReceiveResult taskResult;

            while (ws.State == WebSocketState.Open)
            {
                string jsonResult = "";

                do
                {
                    taskResult = await ws.ReceiveAsync(buffer, token);
                    jsonResult += Encoding.UTF8.GetString(buffer.Array, 0, taskResult.Count);

                } while (!taskResult.EndOfMessage);

                if (!string.IsNullOrEmpty(jsonResult))
                {
                    Console.WriteLine("Queueing {0}", jsonResult);
                   
                    priceToken.Cancel()
                    priceToken = new CancellationTokenSource();
                   await Task.Run(() => StockPricingAsync(jsonResult), priceToken.Token);
                }
            }
            Console.WriteLine("Recv task exiting...");
        }

        async static Task Send(WebSocket ws, CancellationToken token)
        {
            Console.WriteLine("Send task started...");

            do
            {
                string sendMsg = sendQueue.Take();
                Console.WriteLine("Sending {0}", sendMsg);
                var sendMsgBytes = Encoding.UTF8.GetBytes(sendMsg);
                ArraySegment<byte> segmentBuffer = new ArraySegment<byte>(sendMsgBytes, 0, sendMsgBytes.Length);

                await ws.SendAsync(segmentBuffer, WebSocketMessageType.Text, true, token);

            } while (ws.State == WebSocketState.Open);

            Console.WriteLine("Send task exiting...");
        }


  public async Task StockPricingAsync(string symbol)
        {


            var lastUpdated = DateTime.MinValue;
            double previousPrice = 0;
            new StockService().GetStockPricing(symbol, true);

            while (!priceToken.Token.IsCancellationRequested)
            {
                var price = new StockService().GetStockPricing(symbol, false);

                if (price != null && lastUpdated != price.LastPriceDate)
                {
                    lastUpdated = price.LastPriceDate;
                    if (price.LastPrice > previousPrice)
                        price.Tick = Stock.Tick.UpTick;
                    else if (price.LastPrice < previousPrice)
                        price.Tick = Stock.Tick.DownTick;
                    else
                        price.Tick = Stock.Tick.NoChange;

                    previousPrice = price.LastPrice;

                    var json = JsonConvert.SerializeObject(new ServerData(new KeyValuePair<string, object>("StockPricing", price), _eventMessage), _jsonSerializerSettings);

                    sendQueue.Add(json);
                }
                await Task.Delay(3000, this.priceToken.Token);
            }
            if (this.priceToken.Token.IsCancellationRequested)
                this.priceToken.Token.ThrowIfCancellationRequested();

        }
Maya
  • 1,414
  • 5
  • 22
  • 43
  • 1
    Related: [Using CancellationToken for timeout in Task.Run does not work](https://stackoverflow.com/questions/22637642/using-cancellationtoken-for-timeout-in-task-run-does-not-work). Also this article might be helpful: [A Tour of Task, Part 9: Delegate Tasks](https://blog.stephencleary.com/2015/03/a-tour-of-task-part-9-delegate-tasks.html). – Theodor Zoulias Oct 23 '22 at 20:12

0 Answers0