4

Does ChannelReader<T>.ReadAllAsync throw any exceptions when being canceled by a CancellationToken? It doesn't seem to be throwing OperationCanceledException/TaskCanceledException?

I know if these two methods were called in a fire and forget manner, i.e. _ = SendLoopAsync(); _ = ReceiveLoopAsync();, it would've crashed the task with no displayed message/exception because they were not awaited, meaning that we're losing the exceptions.

I don't want it to crash that task without letting me know that it actually has crashed/been cancelled, which means I should probably wrap the whole SendLoopAsync in a try/catch instead of what's between ReadAllAsync's branches.

A small example representing its behavior will be appreciated.

var clientWebSocket = new ClientWebSocket();
await clientWebSocket.ConnectAsync(new Uri("wss://www.deribit.com/ws/api/v2"), CancellationToken.None).ConfigureAwait(false);

var client = new ChannelWebSocket(clientWebSocket);

for (var i = 1; i <= 10; i++)
{
    client.Output.TryWrite($"Item: {i}");
}

var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(1));
await client.StartAsync(cts.Token).ConfigureAwait(false); // blocks the UI

Console.ReadLine();

public class ChannelExample
{
    private readonly WebSocket _webSocket;
    private readonly Channel<string> _input;
    private readonly Channel<string> _output;

    public ChannelExample(WebSocket webSocket)
    {
        _webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket));

        _input = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
        {
            SingleWriter = true
        });

        _output = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
        {
            SingleReader = true
        });
    }

    public ChannelReader<string> Input => _input.Reader;
    public ChannelWriter<string> Output => _output.Writer;

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        var receiving = ReceiveLoopAsync(cancellationToken);
        var sending = SendLoopAsync(cancellationToken);

        var completedTask = await Task.WhenAny(receiving, sending).ConfigureAwait(false);

        if (completedTask.Exception != null)
        {
            Console.WriteLine("Exception");
        }
    }

    private async Task SendLoopAsync(CancellationToken cancellationToken)
    {
        await foreach (var message in _output.Reader.ReadAllAsync(cancellationToken))
        {
            Console.WriteLine($"Sending: {message}");
            await Task.Delay(5000, cancellationToken).ConfigureAwait(false);
        }
    }

    private async Task ReceiveLoopAsync(CancellationToken cancellationToken)
    {
        using var buffer = MemoryPool<byte>.Shared.Rent();

        while (_webSocket.State == WebSocketState.Open && !cancellationToken.IsCancellationRequested)
        {
            ValueWebSocketReceiveResult receiveResult;
            do
            {
                receiveResult = await _webSocket.ReceiveAsync(buffer.Memory, cancellationToken).ConfigureAwait(false);

                if (receiveResult.MessageType == WebSocketMessageType.Close)
                {
                    return;
                }
            } while (!receiveResult.EndOfMessage);
        }
    }
}
nop
  • 4,711
  • 6
  • 32
  • 93
  • question: have you tried it? what happened? – Marc Gravell Apr 19 '22 at 13:32
  • @MarcGravell, No exception no anything. This is the only thing it returned `Sending: Item: 1` – nop Apr 19 '22 at 13:35
  • Somewhat related: [ChannelReader.ReadAllAsync(CancellationToken) not actually cancelled mid-iteration](https://stackoverflow.com/questions/67569758/channelreader-readallasynccancellationtoken-not-actually-cancelled-mid-iterati) – Theodor Zoulias Apr 19 '22 at 14:04

2 Answers2

4

I suspect that it would throw; of course, you can always test that, but - that is the general expected pattern in this scenario. So you would wrap it with a:

try
{
   // ...
}
catch (OperationCancelledException) when (cancellationToken.IsCancellationRequested)
{
    // treat as completion; swallow
}

Alternatively: you could pass CancellationToken.None into the channel read API, and just use the writer's completion to signify exit (making sure that you call .Complete(...) on the writer when exiting).

That said: ReadAllAsync probably isn't the preferred API here, since you don't really need it as IAsyncEnumerable<T> - so it may be preferable to use the native channel API, i.e.

while (await _output.Reader.WaitToReadAsync(cancellationToken))
{
    while (_output.Reader.TryRead(out var message))
    {
        // ...
    }
}
Marc Gravell
  • 1,026,079
  • 266
  • 2,566
  • 2,900
  • Thank you! Good point. – nop Apr 19 '22 at 13:45
  • 2
    @TheodorZoulias fair enough; that's a good catch (and a good `catch`) – Marc Gravell Apr 19 '22 at 14:06
  • I realized that I could also use `if (completedTask.IsCanceled) { }`, which comes from the Task.WhenAny. – nop Apr 19 '22 at 14:27
  • Marc, is it possible to show me an example for `you could pass CancellationToken.None into the channel read API, and just use the writer's completion to signify exit (making sure that you call .Complete(...) on the writer when exiting).`? Thanks – nop Apr 27 '22 at 13:33
  • 1
    @nop just use the last example, with `.WaitToReadAsync(CancellationToken.None)`, and use `_output.Writer.TryComplete()` when you're done – Marc Gravell Apr 27 '22 at 15:01
  • Thanks! One more question: since my whole code is async and it will be integrated in ASP.NET Core, is `_output.Reader.TryRead` an issue? Should I change it with ReadAsync (its async variant)? – nop Apr 28 '22 at 06:37
  • 1
    @nop the combined `WaitToReadAsync` and `TryRead` is the recommended route - it avoids a few overheads when multiple items arrive at once, and it has a cleaner loop exit approach; the `ReadAsync` approach is the lazy/fallback route – Marc Gravell Apr 28 '22 at 08:18
1

I am not sure what does the Task returned by the StartAsync represent:

public async Task StartAsync(CancellationToken cancellationToken)
{
    var receiving = ReceiveLoopAsync(cancellationToken);
    var sending = SendLoopAsync(cancellationToken);

    var completedTask = await Task.WhenAny(receiving, sending).ConfigureAwait(false);

    if (completedTask.Exception != null)
    {
        Console.WriteLine("Exception");
    }
}

It seems that it represents the completion of any of the receiving and sending tasks, which is weird. Most probably this is an unintended consequence of trying to log the exceptions of the tasks. There are better ways to log task exceptions than this, with the simplest being to enclose all the code inside the asynchronous method in a try/catch block. Beyond that, the Exception property of a Task is not-null only when the task IsFaulted, not when it IsCanceled.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 1
    Good point. I just realized what you wrote the hard way: "Beyond that, the Exception property of a Task is not-null only when the task IsFaulted, not when it IsCanceled" – nop Apr 19 '22 at 14:43