1

Since we expect to be reading frequently, and for us to often be reading when data is already available to be consumed, should SendLoopAsync return ValueTask rather than Task, so that we can make it allocation-free?

// Caller
_ = Task.Factory.StartNew(_ => SendLoopAsync(cancellationToken), TaskCreationOptions.LongRunning, cancellationToken);

// Method
private async ValueTask SendLoopAsync(CancellationToken cancellationToken)
{
    while (await _outputChannel.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
    {
        while (_outputChannel.Reader.TryRead(out var message))
        {
            using (await _mutex.LockAsync(cancellationToken).ConfigureAwait(false))
            {
                await _clientWebSocket.SendAsync(message.Data.AsMemory(), message.MessageType, true, cancellationToken).ConfigureAwait(false);
            }
        }
    }
}
nop
  • 4,711
  • 6
  • 32
  • 93
  • @LeiYang, just added them. It depends on System.Threading.Channels & Nito.AsyncEx. – nop Apr 08 '22 at 07:48
  • Since you call the SendLoopAsync with Task.Factory.StartNew, I don't see any benefits of ValueTask there. – Jeroen van Langen Apr 08 '22 at 08:02
  • @JeroenvanLangen, what if it was just `awaited`, without `Task.Run` and `Task.Factory.StartNew`, would be nice if you describe these scenarios in the answer as well. – nop Apr 08 '22 at 08:05
  • It depends what WaitToReadAsync and As you can see, there is no path in your method that returns without any await. – Jeroen van Langen Apr 08 '22 at 09:37
  • @JeroenvanLangen, `there is no path in your method that returns without any await`, synchronous calls are bad for ValueTask, or you meant returning any value like `true`, `10`, whatever? – nop Apr 08 '22 at 10:05
  • The whole optimize for `ValueTask` is not allocating a `Task` instance. When there wasn't an `await` in the hot path. _(for example cache hits)_ – Jeroen van Langen Apr 08 '22 at 10:08
  • @nop why not use `await foreach(var msg in reader.ReadAllAsync())` instead? There's no reason for the lock. There's no reason for `Task.Factory.StartNew` either, since the method only calls asynchronous methods. Even if there was something that could block, the task should appear *inside* the consumer method. – Panagiotis Kanavos Apr 11 '22 at 14:29
  • @PanagiotisKanavos, I just saw that ReadAllAsync literally does the same, by looking at its source code. Thanks! – nop Apr 11 '22 at 15:15
  • 1
    @nop using it cleans your code though, and allows you to use the System.Linq.Async methods, eg `Select`, `Where`, to transform or filter messages, or use `Parallel.ForEachAsync` in .NET 6 – Panagiotis Kanavos Apr 11 '22 at 15:19

2 Answers2

1

No, there is no value at the SendLoopAsync returning a ValueTask instead of Task. This method is invoked only once in your code. The impact of avoiding a single allocation of a tiny object is practically zero. You should consider using ValueTasks for asynchronous methods that are invoked repeatedly in loops, especially in hot paths. That's not the case in the example presented in the question.

As a side note, invoking asynchronous methods with the Task.Factory.StartNew+TaskCreationOptions.LongRunning combination is pointless. The new Thread that is going to be created, will have a very short life. It's gonna be terminated immediately when the code reaches the first await of an incomplete awaitable inside the async method. Also you are getting back a nested Task<Task>, which is tricky to handle. Using Task.Run is preferable. You can read here the reasons why.

Also be aware that the Nito.AsyncEx.AsyncLock class is not optimized for memory-efficiency. Lots of allocations are happening every time the lock is acquired. If you want a low-allocation synchronization primitive that can be acquired asynchronously, your best bet currently is probably to use a Channel<object> instance, initialized with a single null value: retrieve the value to enter, store it back to release.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Great explanation, thank you! I didn't understand only the first sentence, wdym, `WaitToReadAsync` is returning a `ValueTask` and `SendAsync` also a `ValueTask`? I get the rest that I got only a single call to `SendLoopAsync` . For ex. if I move only `clientWebSocket.SendAsync` to another method, it's good to make it a ValueTask, because it is going to be called everytime we want to send a message, but that's not the case here. Great example with the repeatedly parts in loops, because they are invoked many times. – nop Apr 08 '22 at 08:46
  • 1
    @nop yes, if you move the `clientWebSocket.SendAsync` to another async method, and then call that method repeatedly, it might be beneficial to return a `ValueTask` instead of `Task` from that method. – Theodor Zoulias Apr 08 '22 at 08:51
  • About `Nito.AsyncEx.AsyncLock`, take a look at https://stackoverflow.com/questions/71790119/nito-asyncex-lockasync-does-it-work-as-same-as-semaphoreslims-synchronization. I'm using it as a replacement for `private readonly SemaphoreSlim _sendSemaphore = new(1, 1)`, because it requires weird .Wait .Release in the Dispose() method. – nop Apr 08 '22 at 08:53
  • 1
    I read a lot of articles about `ValueTask` but your answer was the only one I could understand. Thank you a lot! – nop Apr 08 '22 at 08:54
  • @nop AFAIK Stephen Cleary wrote the `AsyncLock` class at a time when this functionality was not available in .NET. The `SemaphoreSlim` didn't have a `WaitAsync` method back then. Nowadays there is no reason really to prefer the `AsyncLock` over a `SemaphoreSlim`, other than the convenience of releasing the lock automatically with the `using` statement. Be aware that this convenience comes with a tiny CPU/memory overhead though. – Theodor Zoulias Apr 08 '22 at 08:59
  • Yes, the tiny overhead is because we are creating and disposing a new `SemaphoreSlim` for each "synchronization" instead of reusing simply one, but I would rather choose that than to having to manage stuff in the `Dispose` method, which I might forget for a specific SemaphoreSlim – nop Apr 08 '22 at 09:03
  • 1
    @nop it is also the allocation of the `IDisposable` object, as well as the allocation of the `Task`. You pay these costs every time you acquire the lock, even when there is no contention. On the contrary acquiring an uncontested `SemaphoreSlim` is allocation-free. – Theodor Zoulias Apr 08 '22 at 09:11
1

The idiomatic way to use channels doesn't need locks, semaphores or Task.Factory.StartNew. The typical way to use a channel is to have a method that accepts just a ChannelReader as input. If the method wants to use a Channel as output, it should create it itself and only return a ChannelReader that can be passed to other methods. By owning the channel the method knows when it can be closed.

In the questions case, the code is simple enough though. A simple await foreach should be enough:

private async ValueTask SendLoopAsync(ChannelReader<Message> reader,
                                      CancellationToken cancellationToken)
{
    await foreach (var msg in reader.ReadAllAsync(cancellationToken))
    {
        await _clientWebSocket.SendAsync(message.Data.AsMemory(), 
                      message.MessageType, 
                      true, cancellationToken);
    }
}

This method doesn't need an external Task.Run or Task.Factory.New to work. To run it, just call it and store its task somewhere, but not discarded:

public MyWorker(ChannelReader<Message> reader,CancellationToken token)
{
   .....
   _loopTask=SendLoopAsync(reader,token);
}

This way, once the input channel completes, the code can await for _loopTask to finish processing any pending messages.

Any blocking code should run inside it with Task.Run(), eg

private async ValueTask SendLoopAsync(ChannelReader<Message> reader,
                                      CancellationToken cancellationToken)
{
    await foreach (var msg in reader.ReadAllAsync(cancellationToken))
    {
        var new_msg=await Task.Run(()=>SomeHeavyWork(msg),cancellationToken);
        await _clientWebSocket.SendAsync(message.Data.AsMemory(), 
                      message.MessageType, 
                      true, cancellationToken);
    }
}

Concurrent Workers

This method could be used to start multiple concurrent workers too :


var tasks=Enumerable.Range(0,dop).Select(_=>SendLoopAsync(reader,token));
_loopTask=Task.WhenAll(tasks);
...
await _loopTask;

In .NET 6, Parallel.ForEachAsync can be used to process multiple messages with less code:

private async ValueTask SendLoopAsync(ChannelReader<Message> reader,
                                      CancellationToken cancellationToken)
{
    var options=new ParallelOptions {
        CancellationToke=cancellationToken,
        MaxDegreeOfParallellism=4
    };
    var input=reader.ReadAllAsync(cancellationToken);
    await Parallel.ForEachAsync(input,options,async (msg,token)=>{
        var new_msg=await Task.Run(()=>SomeHeavyWork(msg),token);
        await _clientWebSocket.SendAsync(message.Data.AsMemory(), 
                      message.MessageType, 
                      true, token);
    });
}

Idiomatic Channel Producers

Instead of using a class-level channel stored in a field, create the channel inside the producer method and only return its reader. This way the producer method has control of the channel's lifecycle and can close it when it's done. That's one of the reasons a Channel can only be accessed accessed only through its Reader and Writer classes.

A method can consume a ChannelReader and return another. This allows creating methods that can be chained together into a pipeline.

A simple producer can look like this:

ChannelReader<Message> Producer(CancellationToke token)
{
    var channel=Channel.CreateUnbounded<Message>();
    var writer=channel.Writer;
    _ = Task.Run(()=>{
        while(!token.IsCancellationRequested)
        {
            var msg=SomeHeavyJob();
            await writer.SendAsync(msg);
        },token)
    .ContinueWith(t=>writer.TryComplete(t));

    return channel.Reader;
}

When cancellation is signaled, the worker exits or an exception is thrown, the main task exists and ContinueWith calls TryComplete on the writer with any exception that may have been thrown. That's a simple non-blocking operation so it doesn't matter what thread it runs on.

A transforming method would look like this:

ChannelReader<Msg2> Transform(ChannelReader<Msg1> input,CancellationToke token)
{
    var channel=Channel.CreateUnbounded<Msg2>();
    var writer=channel.Writer;
    _ = Task.Run(()=>{
        await foreach(var msg1 in input.ReadAllAsync(token))
        {
            var msg2=SomeHeavyJob(msg1);
            await writer.SendAsync(msg2);
        },token)
    .ContinueWith(t=>writer.TryComplete(t));

    return channel.Reader;
}

Turning those methods into static extension methods would allow chaining them one after the other :

var task=Producer()
          .Transformer()
          .Consumer();

I don't set SingleWriter because this doesn't seem to be doing anything yet. Searching for this in the .NET runtime repo on Github doesn't show any results beyond test code.

Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
  • Thank you for your answer! What makes u think SemaphoreSlim is not required, because `SingleReader/SingleWriter` is set to `true`? https://i.imgur.com/w205cL7.png – nop Apr 11 '22 at 15:48
  • 1
    ChannelReader supports multiple readers. If, and only if, you know there will only be a single reader, you can set that option to true so the ChannelReader can disable its own concurrency synchronization. Enabling that setting and then *locking* is worse than using a ChannelReader alone – Panagiotis Kanavos Apr 11 '22 at 15:53
  • 1
    @nop on the other hand, if you use channels propertly (ie the producer method owns the channel, only returns a reader), the code becomes a lot simpler *and* you avoid concurrency issues – Panagiotis Kanavos Apr 11 '22 at 15:55
  • 1
    @nop I added examples how a producer and a transforming method would look, and how they could be combined in a pipeline – Panagiotis Kanavos Apr 11 '22 at 16:10
  • Sorry for the delayed answer. I changed my code based on your answer https://www.toptal.com/developers/hastebin/ilifipajeh.csharp. However, I don't like the `Task.Run` with `ContinueWith`. What I tried to accomplish is auto reconnect. May you have a look at what I did and possible a link to the Go articles about Channels? I really want to do it properly but there is not much information. I've read everything from Stephen Toub – nop Apr 11 '22 at 19:11
  • Oh one more thing, when I'm reconnecting the socket, I know I have to reinstantiate `ClientWebSocket` because that's how that class was designed but what about the ReceiveLoop and SendLoop? Do I need to `.TryComplete` them and then re-create them or it's not necessary? – nop Apr 11 '22 at 19:30