3

Is it possible to receive same message by multiple consumers. I have a single producer which produces Tick data(stock market) from web sockets. I have single consumer now receives 1000 messages per second, it works great. But now I would like to have multiple consumers to receive same message using System.Threading.Channels. Complete working code for single producer/consumer.

class ConsumerOne
{
    private readonly Channel<DummyData> _tickQueue;
    private readonly CancellationTokenSource _cancellationTokenSource;
    private readonly string _tag;

    public ConsumerOne(Channel<DummyData> tickQueue, CancellationTokenSource cancellationTokenSource, string tag)
    {
        _tickQueue = tickQueue;
        _cancellationTokenSource = cancellationTokenSource;
        _tag = tag;
    }

    public async Task StartConsuming()
    {
        await foreach (var message in _tickQueue.Reader.ReadAllAsync(
                           cancellationToken: _cancellationTokenSource.Token))
        {
            // Business logic of One
            Console.WriteLine($"from consumer {_tag} ==> {message.Price}");
        }
    }
}

public class DummyData
{
    public long Ticks { get; set; }
    public DateTime DateTime { get; set; }
    public decimal Price { get; set; }
}

class Producer
{
    private readonly Channel<DummyData> _tickQueue;
    private readonly CancellationTokenSource _cancellationTokenSource;

    public Producer(Channel<DummyData> tickQueue, CancellationTokenSource cancellationTokenSource)
    {
        _tickQueue = tickQueue;
        _cancellationTokenSource = cancellationTokenSource;
    }

    public async Task StartProducing()
    {
        Random r = new Random();
        for (int i = 0; i < 10; i++)
        {

            await _tickQueue.Writer.WriteAsync(new DummyData()
            {
                DateTime = DateTime.Now,
                Ticks = DateTime.Now.Ticks,
                Price = Convert.ToDecimal(r.NextDouble() * r.Next(100, 105))
            }, _cancellationTokenSource.Token);
            await Task.Delay(r.Next(50, 500));
        }
    }
}

internal class MultipleConsumersEg
{
    private static Channel<DummyData> tickQueue;
    private static readonly CancellationTokenSource TickQueueCancellationTokenSource = new CancellationTokenSource();
    public static async Task Main(string[] args)
    {
        tickQueue = Channel.CreateUnbounded<DummyData>();

        Producer p = new Producer(tickQueue, TickQueueCancellationTokenSource);
        ConsumerOne consumerOne = new ConsumerOne(tickQueue, TickQueueCancellationTokenSource, "ONE");
         consumerOne.StartConsuming();

         p.StartProducing();

        Console.ReadLine();
    }
}

Above code snippets works for single producer/consumer, fiddle link. Now I would like to have another Consumer for different strategy (each consumer for one strategy).

class ConsumerTwo
{
    private readonly Channel<DummyData> _tickQueue;
    private readonly CancellationTokenSource _cancellationTokenSource;
    private readonly string _tag;

    public ConsumerTwo(Channel<DummyData> tickQueue, CancellationTokenSource cancellationTokenSource, string tag)
    {
        _tickQueue = tickQueue;
        _cancellationTokenSource = cancellationTokenSource;
        _tag = tag;
    }

    public async Task StartConsuming()
    {
        await foreach (var message in _tickQueue.Reader.ReadAllAsync(
                           cancellationToken: _cancellationTokenSource.Token))
        {
            // Business logic of Two
            Console.WriteLine($"from consumer {_tag} ==> {message.Price}");
        }
    }
}

    public static async Task Main(string[] args)
    {
        tickQueue = Channel.CreateUnbounded<DummyData>();

        Producer p = new Producer(tickQueue, TickQueueCancellationTokenSource);
        ConsumerOne consumerOne = new ConsumerOne(tickQueue, TickQueueCancellationTokenSource, "ONE");
         consumerOne.StartConsuming();

        ConsumerTwo consumerTwo = new ConsumerTwo(tickQueue, TickQueueCancellationTokenSource, "TWO");
         consumerTwo.StartConsuming();

         p.StartProducing();

        Console.ReadLine();
    }

After adding 2nd consumer it consumes data, but same data can't seen by two consumers. Here I want all consumers to receive all 10 messages. Considering at max I may have 50 consumers in future, all should receive same message.

Output:

    from consumer TWO ==> 7.27597006121753
    from consumer TWO ==> 30.4838315240171
    from consumer TWO ==> 31.3675707908867
    from consumer TWO ==> 53.2673930636206
    from consumer ONE ==> 74.6396192795487
    from consumer TWO ==> 24.2795471970634
    from consumer ONE ==> 88.6467375550418
    from consumer ONE ==> 26.3311568478758
    from consumer TWO ==> 20.8731819843862
    from consumer ONE ==> 0.85598795659704

All messages should receive both the consumers.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
bommina
  • 307
  • 4
  • 16
  • 2
    Related: [Multiple consumers without losing messages](https://stackoverflow.com/questions/72479688/multiple-consumers-without-losing-messages) – Theodor Zoulias Aug 06 '22 at 21:01
  • 1
    There's nothing built-in for this; you'll have to do the broadcast logic yourself. – Stephen Cleary Aug 06 '22 at 23:05
  • 1
    @TheodorZoulias, based on this answer I don't think so that is elegant way to have N of channels for if we need N of subscribers. Producers produces 1000 messages per second, now without any latency 1 consumer can see data immediately. But if we 50 writers * 1000 messages per second, performance may degrade drastically. Any other libraries can solve with high same speed – bommina Aug 07 '22 at 10:13
  • 1
    @bommina the *other* answer shows how to easily do what you want. You can create a `Broadcast` method that returns as many output channels as you want, with the characteristics you want. Bounded or unbounded, discarding old items or not `Any other libraries can solve with high same speed` which other libraries are you talking about and are you sure they're faster? 1000 msg/sec is slow, not fast. Channels are a pretty low-level class meant for far higher loads. They're meant to be combined to build the complex behavior you want – Panagiotis Kanavos Aug 07 '22 at 11:06
  • For example, you could emulate a Dataflow-like Broadcast with Bounded output channels with Capacity=1 and FullMode behavior that discards new or old messages. You could pass an array with capacity counts instead of a channel count to create different output channels. Or you could copy each output channel to another with different bound behavior. If you create those channels with `SingleReader`, you get a faster, simplified channel implementation. – Panagiotis Kanavos Aug 07 '22 at 11:12
  • @bommina there are other techniques too, if you change your perspective. If you want to perform the same analysis from 50 different stocks, you can create 50 instances of the processing pipeline with something as simple as LINQ's `Select` and then copy every input message to each pipeline's input channel with even less code: an `await foreach(var msg in fixOutput.ReadAllAsync()` and then post the same message to each of the pipelines – Panagiotis Kanavos Aug 07 '22 at 11:16
  • @bommina the same applies if you have 50 different algorithms you want to apply to the same ticker symbol. Create a different pipeline for each algorithm and copy each incoming message to all of them – Panagiotis Kanavos Aug 07 '22 at 11:19
  • ​​@PanagiotisKanavos comments are intended for asking for more information, or for suggesting improvements for the question, [not for answering it](https://prnt.sc/RfK7kkKxohGr "Use comments to ask for more information or suggest improvements. Avoid answering questions in comments."). Why are you always try to answer the question in the comments? – Theodor Zoulias Aug 07 '22 at 13:29
  • 1
    ​​bommina I would suggest to measure the overhead of having a dedicated `Channel` per consumer, before searching for alternative solutions. The overhead might not be as high as you think. Is your producer really slowed down by calling `.Writer.WriteAsync` or `.Writer.TryWrite` 50,000 times per second? – Theodor Zoulias Aug 07 '22 at 14:15

1 Answers1

4

Channels are a low level asynchronous publisher/subscriber, order and operation preserving queue. They provide the low level communication/queueing functionality needed to create Communicting Sequential Process pipelines. Other libraries seem more elegant simply because they added their own code on top of their own pub/sub queues.

The name is significant. Channels implement the communication "channel" between processes/workers or publishers/subscribers if you prefer. They're meant to be the input and output of every pipeline function or object, not just an internal collection. When used this way, it's easy to implement some very complex behavior. The channels themselves are typically owned by the workers, they aren't some kind of global program state.

Your question isn't strictly asking for broadcasting or multicasting. Writing a broadcast function though, is pretty simple. It could be something as simple (or simplistic) as:

public static async Task CopyTo<T>(this ChannelReader<T> input,
        IList<ChannelWriter<T>> outputs, 
        CancellationToken token=default)
{
    await foreach(var msg in input.ReadAllAync(token).ConfigureAwait(false))
    {
        foreach(var o in outputs)
        {
            await o.WriteAsync(msg);
        }
    }
    foreach(var o in outputs)
    {
        o.TryComplete();
    }
}

This copies the same message everywhere. It won't block as long as the output channels are unbounded or at least with large enough capacities to avoid filling up.

It's also easy to create a RouteTo method that would route messages by tag, eg

public static async Task RouteTo<T>(this ChannelReader<T> input,
        IDictionary<string,ChannelWriter<T>> outputs, 
        Func<T,string> selector, 
        CancellationToken token=default)
{
    await foreach(var msg in input.ReadAllAync(token).ConfigureAwait(false))
    {
        var key=selector(msg);
        if (outputs.TryGetValue(key, out var o)
        {
            await o.WriteAsync(msg);
        }
    }
    foreach(var o in outputs.Values)
    {
        o.TryComplete();
    }
}

Error handling, cancellation and awaiting would have to be adjusted according to the applications requirements. For example, the loop means that if one channel is a bounded channel at capacity, other channels would have to wait. This can be avoided if all write tasks are collected and awaited with Task.WhenAll

await Task.WhenAll(outputs.Select(o=>o.WriteAsync(msg)));

Let's say the producer is a FIX listener class, each message it receives should be published through a ChannelReader<> output property:

public class FixProducer
{
   Channel<DummyData> _channel;

   public ChannelReader<DummyData> Output=>_channel.Reader;

   SomeFIXEngine _engine;
   public FixPublisher(SomeFIXEngine engine)
   {
       _engine=engine;
       _channel=Channel.CreateUnbounded<DummyData>();
   }

   public async Task StartProducing(CancellationToken token=default)
   {
       var writer=_channel.Writer;
       for (...)
       {
           if(token.IsCancellationRequested)
           {
               break;
           }
           var data=_engine.GetSomeData();
           await _writer.WriteAsync(data);
       }
       writer.Complete();
   }
}

The consumers can receive their input through their own ChannelWriter Input properties:

interface IConsumer<T>
{
    ChannelWriter<T> Input {get;}
}

class ConsumerOne:IConsumer<DummyData>
{
    private readonly Channel<DummyData> _input;
    
    public ChannelWriter<DummyData> Input=>_input.Writer;

    public ConsumerOne(...)
    {
        _input=Channel.CreateUnbounderd<DummyData>();        
    }

    public async Task StartConsuming(CancellationToken token=default)
    {
        await foreach (var message in _input.Reader.ReadAllAsync(token).ConfigureAwait(false))
        {
            ...
        }
    }
}

CopyTo can now be used to copy FIX messages to all consumers:

var producer=new FixProducer(...);
var consumerOne=new ConsumerOne(...);
var consumerTwo=new ConsumerTwo(...);
...

var copyTask=producer.Output.CopyTo(new[]{consumerOne.Input,consumerTwo.Input});

producer.StartProducing(...);
consumerOne.StartConsuming(...);
...

Now that channels are owned by the consumers, there's no need to have a public StartConsuming method, it could be called in the constructor.

class ConsumerOne:IConsumer<DummyData>
{
    private readonly Channel<DummyData> _input;
    Task _consumeTask;
    public ChannelWriter<DummyData> Input=>_input.Writer;

    public ConsumerOne(...,CancellationToken token=default)
    {
        _input=Channel.CreateUnbounderd<DummyData>();        
        _consumeTask=StartConsuming(token);
    }

    async Task StartConsuming(CancellationToken token=default)
    {
        await foreach (var message in _input.Reader.ReadAllAsync(token).ConfigureAwait(false))
        {
            ...
        }
    }
    ...
}

The consumer task will keep running until the upstream producer calls Complete() on the input ChannelWriter.

Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
  • Thank You for spending time to provide detailed explanation, however the solution is same as https://stackoverflow.com/questions/72479688/multiple-consumers-without-losing-messages mentioned here with good explanation. It seems solution is to create N no of channels and copy/rewrite message to all channels. I would like to see how to use .RouteTo(), could you please provide example how to call .RouteTo() function. – bommina Aug 07 '22 at 14:32
  • 1
    @bommina it would be better if you explained the actual problem. `create N no of channels and copy/rewrite message to all channels` that's exactly what you asked. A channel is a consumer's input buffer so N consumers mean N channels. It would be the same with any other buffer. Offering (not copying) the same immutable message to multiple consumers isn't expensive. It may *not* be needed though, if the algorithm allows it. Calculating an average only requires calculating a count and sum per symbol. That could be done using a single worker for all symbols. – Panagiotis Kanavos Aug 09 '22 at 10:28
  • @bommina or you may be able to use a few pipelines to process many more symbols in *batch* mode. As a quick&dirty way of accelerating a slow algo trading app, I created 12 pipelines/workers and fed different symbols to each one. The code was written in a way that recognized when one symbol's stream stopped and another started. That got a 12x speedup to begin with, before applying other optimizations – Panagiotis Kanavos Aug 09 '22 at 10:31