There are a couple of issues with the code, including forgetting to enumate the producers
and consumers
enumerables. IEnumerable
is evaluated lazily, so until you actually enumerate it with eg foreach
or ToList
, nothing is generated.
There's nothing wrong with ContinueWith
when used properly either. It's definitely better and cheaper than using exceptions as control flow.
The code can be improved a lot by using some common Channel coding patterns.
- The producer owns and encapsulates the channel
- The producer exposes only Reader(s)
Plus, ContinueWith
is an excellent choice to signal a ChannelWriter's completion, as we don't care at all which thread will do that. If anything, we'd prefer to use one of the "worker" threads to avoid a thread switch.
Let's say the producer function is:
async Task Produce(ChannelWriter<string> writer, int producerNumber)
{
return Task.Run(async () =>
{
var rnd = new Random();
for (var i = 0; i < 10; i++)
{
var t = $"Message {i}";
Console.WriteLine($"Producing {t} on producer {producerNumber}");
await channel.Writer.WriteAsync(t);
await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
}
}
}
Producer
The producer can be :
ChannelReader<string> ProduceData(int dop)
{
var channel=Channel.CreateUnbounded<string>();
var writer=channel.Writer;
var tasks=Enumerable.Range(0,dop)
.Select(producerNumber => Produce(producerNumber))
.ToList();
_ =Task.WhenAll(tasks).ContinueWith(t=>writer.TryComplete(t.Exception));
.
return channel.Reader;
}
Completion and error propagation
Notice the line :
_ =Task.WhenAll(tasks).ContinueWith(t=>writer.TryComplete(t.Exception));
This says that as soon as the producers complete, the writer itself should complete with any exception that may be raised. It doesn't really matter what thread the continuation runs on as it doesn't do anything other than call TryComplete
.
More importantly, t=>writer.TryComplete(t.Exception)
propagates the worker exception(s) to downstream consumers. Otherwise the consumers would never know something went wrong. If you had a database consumer you'd want it to avoid finalizing any changes if the source aborted.
Consumer
The consumer method can be:
async Task Consume(ChannelReader<string> reader,int dop,CancellationToken token=default)
{
var tasks= Enumerable
.Range(1, dop)
.Select(consumerNumber =>
Task.Run(async () =>
{
await foreach(var item in reader.ReadAllAsync(token))
{
Console.WriteLine($"Consuming {item} on consumer {consumerNumber}");
}
}));
await Task.WhenAll(tasks);
}
In this case await Task.WhenAll(tasks);
enumerates the worker tasks thus starting them.
Nothing else is needed to produce all generated messages. When all producers finish, the Channel.Reader
is completed. When that happens, ReadAllAsync
will keep offering all remaining messages to the consumers and exit.
Composition
Combining both methods is as easy as:
var reader=Produce(10);
await Consume(reader);
General Pattern
This is a general pattern for pipeline stages using Channels - read the input from a ChannelReader, write it to an internal Channel and return only the owned channel's Reader. This way the stage owns the channel which makes completion and error handling a lot easier:
static ChannelReader<TOut> Crunch<Tin,TOut>(this ChannelReader<Tin>,int dop=1,CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<TOut>();
var writer=channel.Writer;
var tasks=Enumerable.Range(0,dop)
.Select(async i=>Task.Run(async ()=>
{
await(var item in reader.ReadAllAsync(token))
{
try
{
...
await writer.WriteAsync(msg);
}
catch(Exception exc)
{
//Handle the exception and keep processing messages
}
}
},token));
_ =Task.WhenAll(tasks)
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel.Reader;
}
This allows chaining multiple "stages" together to form a pipeline:
var finalReader=Producer(...)
.Crunch1()
.Crunch2(10)
.Crunch3();
await foreach(var result in finalReader.ReadAllAsync())
{
...
}
Producer and consumer methods can be written in the same way, allowing, eg the creation of a data import pipeline:
var importTask = ReadFiles<string>(somePath)
.ParseCsv<string,Record[]>(10)
.ImportToDb<Record>(connectionString);
await importTask;
With ReadFiles
static ChannelReader<string> ReadFiles(string folder)
{
var channel=Channel.CreateUnbounded<string>();
var writer=channel.Writer;
var task=Task.Run(async ()=>{
foreach(var path in Directory.EnumerateFiles(folder,"*.csv"))
{
await writer.WriteAsync(path);
}
});
task.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel.Reader;
}
Update for .NET 6 Parallel.ForEachAsync
Now that .NET 6 is supported in production, one could use Parallel.ForEachAsync
to simplify a concurrent consumer to :
static ChannelReader<TOut> Crunch<Tin,TOut>(this ChannelReader<Tin>,
int dop=1,CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<TOut>();
var writer=channel.Writer;
var dop=new ParallelOptions {
MaxDegreeOfParallelism = dop,
CancellationToken = token
};
var task=Parallel.ForEachAsync(
reader.ReadAllAsync(token),
dop,
async item =>{
try
{
...
await writer.WriteAsync(msg);
}
catch(Exception exc)
{
//Handle the exception and keep processing messages
}
});
task.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel.Reader;
}