What you ask is possible, just not in this way.
A Channel is a single asynchronous queue, kind-of like a ConcurrentQueue with an async interface (and order guarantees, backpressure and some other stuff). Just like a ConcurrentQueue, when multiple consumers read from the queue, each one will receive a new message. To send the same message to multiple consumers you'll have to broadcast it.
Common Channel Pattern
A common pattern with channels is for each processing method only consume a ChannelReader
passed as input, create and own its own channel and return it as its output. This is very common in Go ( blog post and talk ), where channels are used extensively for producer/consumer communication and pipelines. If you replace <-chan int
with ChannelReader you'll realize almost all methods receive a ChannelReader and returns a new one.
This way the processing method can control the lifetime of the channel. When the input completes or the work gets cancelled, completion is propagated to the consumers. Since the output channel was created by the worker itself, the method has full control over its lifetime :
ChannelReader<string> Worker(ChannelReader<int> input,
CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<string>();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
await foreach(var msg in input.ReadAllAsync(token))
{
await writer.WriteAsync(msg.ToString(),token);
}
},token)
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel.Reader;
}
This boilerplate code can be generalized if the actual code is passed as a Func<TIn,TOut>
or Func<TIn,Task<TOut>>
for asynchronous methods:
ChannelReader<TOut> Work<TIn,TOut>(ChannelReader<TIn> input,
Func<TIn,CancellationToken,Task<TOut>> func,
CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<string>();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
await foreach(var msg in input.ReadAllAsync(token))
{
var result=await func(msg,token);
await writer.WriteAsync(result,token);
}
},token)
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel.Reader;
}
ChannelReader<TOut> Work<TIn,TOut>(ChannelReader<TIn> input,
Func<TIn,CancellationToken,TOut> func,
CancellationToken token=default)
{
...
var result=func(msg,token);
await writer.WriteAsync(result,token);
...
}
This can be used to create any processing block, eg :
ChannelReader<int> step1Out=.....;
ChannelReader<int> step2Out=Work<int,int>(step1Out,async (i,token)=>{
await Task.Delay(i*1000,token);
return i;
});
ChannelReader<string> step3Out=Work<int,string>(step2Out,(i,token)=>{
var line=$"Data is {i}";
Console.WriteLine(line);
return line;
});
A method that doesn't produce any output can be simpler but asynchronous :
async Task Consume<TIn>(ChannelReader<TIn> input,
Action<TIn,CancellationToken> act,
CancellationToken token=default)
{
await foreach(var msg in input.ReadAllAsync(token))
{
act(msg,token);
}
}
...
await Consume(step2Out,(i,token)=>Console.WriteLine($"Data is {i}"));
Broadcasting
This simple pattern can be adopted to broadcast the same message to N consumers, by creating N channels and returning their readers:
IList<ChannelReader<T>> Broadcast<T>(ChannelReader<T> input, int count, CancellationToken token=default)
{
var channels=Enumerable.Range(0,count)
.Select(_=> Channel.CreateUnbounded<T>())
.ToList();
var writers=channels.Select(c=>c.Writer).ToList();
_ = Task.Run(async ()=>{
await foreach(var msg in input.ReadAllAsync(token))
{
//Offer the message to all output channels
foreach(var w in writers)
{
await w.WriteAsync(msg,token);
}
}
},token)
.ContinueWith(t=>{
foreach(var w in writers)
{
writer.TryComplete(t.Exception);
}
});
return channels.Select(c=>c.Reader).ToList();
}
This way, one can use broadcast the same message to multiple consumers :
var broadcast=Broadcast<int,int>(step1Out,2);
var reader1=Consume(broadcast[0],(i,token)=>Console.WriteLine("Reader 0: {0}",i));
var reader2=Consume(broadcast[1],(i,token)=>Console.WriteLine("Reader 1: {0}",i));
Or even
var readers=broadcast.Select((b,idx)=>Consume(b,
(i,token)=>Console.WriteLine($"Reader {idx}: {i}"))
.ToList();
await Task.WhenAll(readers);