3

I have a firehose of stuff coming through a redis pub/sub and I need to distribute it to a number of websocket connections, so basically whenever a message comes from redis, it needs to be distributed through all websockets connections.

I want multiple consumer. And each of them should get all the messages.

var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(1)
{
    FullMode = BoundedChannelFullMode.Wait,
    SingleReader = false
});
var cts = new CancellationTokenSource();


var producer = Task.Run(async () =>
{
    int i = 0;
    while (!cts.IsCancellationRequested)
    {
        channel.Writer.TryWrite(i++);

        await Task.Delay(TimeSpan.FromMilliseconds(250));
    }
});

var readerOneTask = Task.Run(async () =>
{
    await foreach (var i in channel.Reader.ReadAllAsync())
    {
        Console.WriteLine($"Reader one: {i}");
    }
});

var readerTwoTask = Task.Run(async () =>
{
    await foreach (var i in channel.Reader.ReadAllAsync())
    {
        Console.WriteLine($"Reader two: {i}");
    }
});

cts.CancelAfter(TimeSpan.FromSeconds(5));

Console.ReadLine();
nop
  • 4,711
  • 6
  • 32
  • 93
  • 2
    You cannot do that using channels, see https://stackoverflow.com/questions/61958526/does-channelt-support-multiple-channelreaders-and-channelwriters-or-only-one – Peter Bons Jun 02 '22 at 17:08
  • @PeterBons you can, just not by trying to read multiple times. A channel is a single queue so reading from it consumes a message no matter what reader wrapper is needed. Just like any other of queue, if you want multiple consumers to receive the same message you need to broadcast it – Panagiotis Kanavos Jun 03 '22 at 10:56
  • @nop it's not too hard to write lock-free code that broadcasts the message to multiple output channels. The trick is that each worker/block gets its own channel, the same way each dataflow block gets its own queue – Panagiotis Kanavos Jun 03 '22 at 11:21
  • @PanagiotisKanavos Yeah I should have added *using a single channel* ;-) – Peter Bons Jun 03 '22 at 11:54
  • @nop I posted an answer that shows how to broadcast messages to multiple consumers in an idiomatic way – Panagiotis Kanavos Jun 03 '22 at 11:55

2 Answers2

4

A single Channel<T> cannot broadcast messages to multiple consumers. Every time a message is read from the channel, the message is consumed, and no other consumer is going to get it. If you want to broadcast all messages to all consumers, you'll have to create one dedicated Channel<T> per consumer.

You might find this question interesting: Factory for IAsyncEnumerable or IAsyncEnumerator. It shows various ways to implement a source/controller for an IAsyncEnumerable<T> sequence, that include channels and Rx subjects.


Update: Below is a demo of how you could use multiple channels, in order to propagate all the messages to all the consumers.

List<Channel<int>> channels = new();

async Task CreateConsumer(Func<Channel<int>, Task> body)
{
    var channel = Channel.CreateUnbounded<int>();
    lock (channels) channels.Add(channel);
    try
    {
        await Task.Run(() => body(channel)).ConfigureAwait(false);
    }
    finally
    {
        lock (channels) channels.Remove(channel);
    }
}

Task consumer1 = CreateConsumer(async channel =>
{
    await foreach (var i in channel.Reader.ReadAllAsync())
    {
        Console.WriteLine($"Consumer one: {i}");
    }
});

Task consumer2 = CreateConsumer(async channel =>
{
    await foreach (var i in channel.Reader.ReadAllAsync())
    {
        Console.WriteLine($"Consumer two: {i}");
    }
});

using CancellationTokenSource cts = new(TimeSpan.FromMilliseconds(3000));
Task producer = Task.Run(async () =>
{
    int i = 0;
    while (true)
    {
        i++;
        lock (channels) channels.ForEach(channel => channel.Writer.TryWrite(i));
        try { await Task.Delay(TimeSpan.FromMilliseconds(250), cts.Token); }
        catch (OperationCanceledException) { break; }
    }
});

try { producer.Wait(); } catch { }
lock (channels) channels.ForEach(channel => channel.Writer.Complete());
Task.WaitAll(producer, consumer1, consumer2);

Try it on Fiddle.

The CreateConsumer is an asynchronous method that is responsible for creating the channel and adding it in the list. It is also responsible for removing the channel from the list when the consumer completes. This is important, otherwise in case a consumer fails the producer would continue pushing messages in the dead channel, resulting in a memory leak.

The "body" of the consumer, that can be different for each consumer, is passed as an asynchronous lambda to the CreateConsumer method.

It is important that all consumers are started, and their channels are created, before starting the producer. That's why the CreateConsumer method is not wrapped in a Task.Run. This way the code inside the CreateConsumer until the first await runs synchronously on the same thread that invoked the CreateConsumer.

Every access to the list with the channels is protected with a lock, because it is possible that multiple threads might attempt to read/modify the list at the same time.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Thank you for your answer! In that case, channels are not a good solution, even if there is a workaround. Is it doable with Rx.NET? – nop Jun 02 '22 at 17:18
  • 1
    @nop yes. Observable sequences can propagate messages to multiple observers (subscribers) by design. – Theodor Zoulias Jun 02 '22 at 17:20
  • Is there a Rx solution that you could redirect me to? Or you could answer here: https://stackoverflow.com/questions/72479643/rx-multiple-consumers-without-losing-messages – nop Jun 02 '22 at 17:22
  • 2
    @nop the issue with the observable sequences is that the messages are delivered synchronously. So the producer must wait until each consumer takes its sweet time processing the message. Everything happens on a single thread. With the channels you can isolate the producer from the consumers. Each consumer can consume its messages at its own pace. This assumes of course that you have created one channel per consumer, in order to deliver all messages to all consumers. – Theodor Zoulias Jun 02 '22 at 17:30
  • 1
    I believe there was a way to parallelize them with `.ObserveOn`, take a look here: https://github.com/Marfusios/binance-client-websocket#parallel-subscriptions – nop Jun 02 '22 at 17:38
  • 2
    @nop yes, it's possible. I was not sure if the messages will be processed sequentially or in parallel per subscriber, but I just tested it and each subscriber processes one message at a time. Which means that the `.ObserveOn(TaskPoolScheduler.Default)` creates an invisible queue behind the scenes. It's unlikely though that this queue will be as efficient as a channel. – Theodor Zoulias Jun 02 '22 at 17:51
  • You could show your tests on the Rx question topic link. About the channels, how do I create two subscribers, here is what I did but one subscriber https://pastebin.com/fgjNqMQQ? – nop Jun 02 '22 at 17:55
  • 2
    @nop you can just create more tasks like the `_ = Task.Run(async () =>`, with each task doing its own `await foreach` loop internally. Each task is a subscriber, and will get all the messages. – Theodor Zoulias Jun 02 '22 at 17:58
  • Actually nothing is being printed on the screen, which means something is wrong (what I did with the channels). I expect both subscribers to be printing 4, 6, 8, but nothing's happening https://pastebin.com/1YgZ4zV2 – nop Jun 02 '22 at 18:01
  • Oh the issue is because the channels are creating right after I write the numbers. If I put a `Task.Delay(5000)` before `YieldReturn(4)...`, it works. How do I actually fix that one? – nop Jun 02 '22 at 18:08
  • 1
    @nop yeah. There is a race condition, with the producer starting emitting values before the subscribers have subscribed. A small `Thread.Sleep` delay should make the example work ([demo](https://dotnetfiddle.net/UfcYeo)), but of course this is not ideal. Actually you found a bug in my `AsyncEnumerableSource` implementation. This class should remember a `Complete` or `Fault` call, and propagate it to future subscribers. – Theodor Zoulias Jun 02 '22 at 18:16
2

What you ask is possible, just not in this way.

A Channel is a single asynchronous queue, kind-of like a ConcurrentQueue with an async interface (and order guarantees, backpressure and some other stuff). Just like a ConcurrentQueue, when multiple consumers read from the queue, each one will receive a new message. To send the same message to multiple consumers you'll have to broadcast it.

Common Channel Pattern

A common pattern with channels is for each processing method only consume a ChannelReader passed as input, create and own its own channel and return it as its output. This is very common in Go ( blog post and talk ), where channels are used extensively for producer/consumer communication and pipelines. If you replace <-chan int with ChannelReader you'll realize almost all methods receive a ChannelReader and returns a new one.

This way the processing method can control the lifetime of the channel. When the input completes or the work gets cancelled, completion is propagated to the consumers. Since the output channel was created by the worker itself, the method has full control over its lifetime :

ChannelReader<string> Worker(ChannelReader<int> input,
                             CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<string>();
    var writer=channel.Writer;
   
    _ = Task.Run(async ()=>{
        await foreach(var msg in input.ReadAllAsync(token))
        {
            await writer.WriteAsync(msg.ToString(),token);
        }
    },token)
    .ContinueWith(t=>writer.TryComplete(t.Exception));

    return channel.Reader;
}

This boilerplate code can be generalized if the actual code is passed as a Func<TIn,TOut> or Func<TIn,Task<TOut>> for asynchronous methods:

ChannelReader<TOut> Work<TIn,TOut>(ChannelReader<TIn> input,
                             Func<TIn,CancellationToken,Task<TOut>> func,  
                             CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<string>();
    var writer=channel.Writer;
   
    _ = Task.Run(async ()=>{
        await foreach(var msg in input.ReadAllAsync(token))
        {
            var result=await func(msg,token);
            await writer.WriteAsync(result,token);
        }
    },token)
    .ContinueWith(t=>writer.TryComplete(t.Exception));

    return channel.Reader;
}

ChannelReader<TOut> Work<TIn,TOut>(ChannelReader<TIn> input,
                             Func<TIn,CancellationToken,TOut> func,  
                             CancellationToken token=default)
{
...
            var result=func(msg,token);
            await writer.WriteAsync(result,token);
...
}

This can be used to create any processing block, eg :

ChannelReader<int> step1Out=.....;
ChannelReader<int> step2Out=Work<int,int>(step1Out,async (i,token)=>{
    await Task.Delay(i*1000,token);
    return i;
});
ChannelReader<string> step3Out=Work<int,string>(step2Out,(i,token)=>{
    var line=$"Data is {i}";
    Console.WriteLine(line);
    return line;
});

A method that doesn't produce any output can be simpler but asynchronous :

async Task Consume<TIn>(ChannelReader<TIn> input,
                             Action<TIn,CancellationToken> act,  
                             CancellationToken token=default)
{
    await foreach(var msg in input.ReadAllAsync(token))
    {
        act(msg,token);
    }
}

...

await Consume(step2Out,(i,token)=>Console.WriteLine($"Data is {i}"));

Broadcasting

This simple pattern can be adopted to broadcast the same message to N consumers, by creating N channels and returning their readers:

IList<ChannelReader<T>> Broadcast<T>(ChannelReader<T> input, int count, CancellationToken token=default)
{
    var channels=Enumerable.Range(0,count)
                           .Select(_=> Channel.CreateUnbounded<T>())
                           .ToList();
    var writers=channels.Select(c=>c.Writer).ToList();
   
    _ = Task.Run(async ()=>{
        await foreach(var msg in input.ReadAllAsync(token))
        {            
            //Offer the message to all output channels
            foreach(var w in writers)
            {
                await w.WriteAsync(msg,token);
            }
        }
    },token)
    .ContinueWith(t=>{
            foreach(var w in writers)
            {
                writer.TryComplete(t.Exception);
            }
    });

    return channels.Select(c=>c.Reader).ToList();
}

This way, one can use broadcast the same message to multiple consumers :

var broadcast=Broadcast<int,int>(step1Out,2);
var reader1=Consume(broadcast[0],(i,token)=>Console.WriteLine("Reader 0: {0}",i));
var reader2=Consume(broadcast[1],(i,token)=>Console.WriteLine("Reader 1: {0}",i));

Or even

var readers=broadcast.Select((b,idx)=>Consume(b,
                         (i,token)=>Console.WriteLine($"Reader {idx}: {i}"))
                     .ToList();
await Task.WhenAll(readers);
Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
  • 1
    @Nop I added a link to [Go channel patterns](https://go.dev/blog/pipelines) and a more detailed [talk](https://talks.golang.org/2012/concurrency.slide#1). In almost all cases each function accepts a ChannelReader and returns a new one – Panagiotis Kanavos Jun 03 '22 at 12:39