This question is kind of an extension to this one, I have updated my code below to reflect a task that runs continuously updating web clients using through a Websocket connection, the tasks checks for stock prices for a provided company (symbol) received in the Recv() method, when the user changes the company the Recv()
method should cancel the running task (for the older company) and start a new one for the newly provided company/symbol.
The problem I'm having is that the line where await Task.Run(() => StockPricingAsync(jsonResult), priceToken.Token);
in the Recv method gets executed doesn't cancel the existing task although there is a priceToken.Cancel
just right before. Any idea how to get the running task cancelled and start the same one immediately?
I have the priceToken variable declared globally so it can be accessed from within the running task, but this does not work.
I also tried to assign the task to variable and make that variable = null
but that did not cancel the task as well.
I'm also open to change the approach completely if I have to, all I need is a running task that provides pricing information for a passed parameter which gets changed by the client while the connection is still open.
CancellationTokenSource priceToken = new CancellationTokenSource();
public async Task GetDataAsync(WebSocket ws)
{
CancellationToken token = new();
try
{
var sendTask = Task.Run(() => Send(ws, token));
var recvTask = Task.Run(() => Recv(ws, token));
do
{
await Task.Delay(1000);
} while (ws.State == WebSocketState.Open);
}
finally
{
await ws.CloseAsync(WebSocketCloseStatus.Empty, "", token);
}
}
async Task Recv(WebSocket ws, CancellationToken token)
{
Console.WriteLine("Recv task started...");
var buffer = WebSocket.CreateClientBuffer(1024, 1024);
WebSocketReceiveResult taskResult;
while (ws.State == WebSocketState.Open)
{
string jsonResult = "";
do
{
taskResult = await ws.ReceiveAsync(buffer, token);
jsonResult += Encoding.UTF8.GetString(buffer.Array, 0, taskResult.Count);
} while (!taskResult.EndOfMessage);
if (!string.IsNullOrEmpty(jsonResult))
{
Console.WriteLine("Queueing {0}", jsonResult);
priceToken.Cancel()
priceToken = new CancellationTokenSource();
await Task.Run(() => StockPricingAsync(jsonResult), priceToken.Token);
}
}
Console.WriteLine("Recv task exiting...");
}
async static Task Send(WebSocket ws, CancellationToken token)
{
Console.WriteLine("Send task started...");
do
{
string sendMsg = sendQueue.Take();
Console.WriteLine("Sending {0}", sendMsg);
var sendMsgBytes = Encoding.UTF8.GetBytes(sendMsg);
ArraySegment<byte> segmentBuffer = new ArraySegment<byte>(sendMsgBytes, 0, sendMsgBytes.Length);
await ws.SendAsync(segmentBuffer, WebSocketMessageType.Text, true, token);
} while (ws.State == WebSocketState.Open);
Console.WriteLine("Send task exiting...");
}
public async Task StockPricingAsync(string symbol)
{
var lastUpdated = DateTime.MinValue;
double previousPrice = 0;
new StockService().GetStockPricing(symbol, true);
while (!priceToken.Token.IsCancellationRequested)
{
var price = new StockService().GetStockPricing(symbol, false);
if (price != null && lastUpdated != price.LastPriceDate)
{
lastUpdated = price.LastPriceDate;
if (price.LastPrice > previousPrice)
price.Tick = Stock.Tick.UpTick;
else if (price.LastPrice < previousPrice)
price.Tick = Stock.Tick.DownTick;
else
price.Tick = Stock.Tick.NoChange;
previousPrice = price.LastPrice;
var json = JsonConvert.SerializeObject(new ServerData(new KeyValuePair<string, object>("StockPricing", price), _eventMessage), _jsonSerializerSettings);
sendQueue.Add(json);
}
await Task.Delay(3000, this.priceToken.Token);
}
if (this.priceToken.Token.IsCancellationRequested)
this.priceToken.Token.ThrowIfCancellationRequested();
}