The idiomatic way to use channels doesn't need locks, semaphores or Task.Factory.StartNew
. The typical way to use a channel is to have a method that accepts just a ChannelReader as input. If the method wants to use a Channel as output, it should create it itself and only return a ChannelReader
that can be passed to other methods. By owning the channel the method knows when it can be closed.
In the questions case, the code is simple enough though. A simple await foreach
should be enough:
private async ValueTask SendLoopAsync(ChannelReader<Message> reader,
CancellationToken cancellationToken)
{
await foreach (var msg in reader.ReadAllAsync(cancellationToken))
{
await _clientWebSocket.SendAsync(message.Data.AsMemory(),
message.MessageType,
true, cancellationToken);
}
}
This method doesn't need an external Task.Run
or Task.Factory.New
to work. To run it, just call it and store its task somewhere, but not discarded:
public MyWorker(ChannelReader<Message> reader,CancellationToken token)
{
.....
_loopTask=SendLoopAsync(reader,token);
}
This way, once the input channel completes, the code can await for _loopTask
to finish processing any pending messages.
Any blocking code should run inside it with Task.Run()
, eg
private async ValueTask SendLoopAsync(ChannelReader<Message> reader,
CancellationToken cancellationToken)
{
await foreach (var msg in reader.ReadAllAsync(cancellationToken))
{
var new_msg=await Task.Run(()=>SomeHeavyWork(msg),cancellationToken);
await _clientWebSocket.SendAsync(message.Data.AsMemory(),
message.MessageType,
true, cancellationToken);
}
}
Concurrent Workers
This method could be used to start multiple concurrent workers too :
var tasks=Enumerable.Range(0,dop).Select(_=>SendLoopAsync(reader,token));
_loopTask=Task.WhenAll(tasks);
...
await _loopTask;
In .NET 6, Parallel.ForEachAsync
can be used to process multiple messages with less code:
private async ValueTask SendLoopAsync(ChannelReader<Message> reader,
CancellationToken cancellationToken)
{
var options=new ParallelOptions {
CancellationToke=cancellationToken,
MaxDegreeOfParallellism=4
};
var input=reader.ReadAllAsync(cancellationToken);
await Parallel.ForEachAsync(input,options,async (msg,token)=>{
var new_msg=await Task.Run(()=>SomeHeavyWork(msg),token);
await _clientWebSocket.SendAsync(message.Data.AsMemory(),
message.MessageType,
true, token);
});
}
Idiomatic Channel Producers
Instead of using a class-level channel stored in a field, create the channel inside the producer method and only return its reader. This way the producer method has control of the channel's lifecycle and can close it when it's done. That's one of the reasons a Channel
can only be accessed accessed only through its Reader and Writer classes.
A method can consume a ChannelReader and return another. This allows creating methods that can be chained together into a pipeline.
A simple producer can look like this:
ChannelReader<Message> Producer(CancellationToke token)
{
var channel=Channel.CreateUnbounded<Message>();
var writer=channel.Writer;
_ = Task.Run(()=>{
while(!token.IsCancellationRequested)
{
var msg=SomeHeavyJob();
await writer.SendAsync(msg);
},token)
.ContinueWith(t=>writer.TryComplete(t));
return channel.Reader;
}
When cancellation is signaled, the worker exits or an exception is thrown, the main task exists and ContinueWith
calls TryComplete
on the writer with any exception that may have been thrown. That's a simple non-blocking operation so it doesn't matter what thread it runs on.
A transforming method would look like this:
ChannelReader<Msg2> Transform(ChannelReader<Msg1> input,CancellationToke token)
{
var channel=Channel.CreateUnbounded<Msg2>();
var writer=channel.Writer;
_ = Task.Run(()=>{
await foreach(var msg1 in input.ReadAllAsync(token))
{
var msg2=SomeHeavyJob(msg1);
await writer.SendAsync(msg2);
},token)
.ContinueWith(t=>writer.TryComplete(t));
return channel.Reader;
}
Turning those methods into static extension methods would allow chaining them one after the other :
var task=Producer()
.Transformer()
.Consumer();
I don't set SingleWriter
because this doesn't seem to be doing anything yet. Searching for this in the .NET runtime repo on Github doesn't show any results beyond test code.