Is it possible to receive same message by multiple consumers. I have a single producer which produces Tick data(stock market) from web sockets. I have single consumer now receives 1000 messages per second, it works great. But now I would like to have multiple consumers to receive same message using System.Threading.Channels
. Complete working code for single producer/consumer.
class ConsumerOne
{
private readonly Channel<DummyData> _tickQueue;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly string _tag;
public ConsumerOne(Channel<DummyData> tickQueue, CancellationTokenSource cancellationTokenSource, string tag)
{
_tickQueue = tickQueue;
_cancellationTokenSource = cancellationTokenSource;
_tag = tag;
}
public async Task StartConsuming()
{
await foreach (var message in _tickQueue.Reader.ReadAllAsync(
cancellationToken: _cancellationTokenSource.Token))
{
// Business logic of One
Console.WriteLine($"from consumer {_tag} ==> {message.Price}");
}
}
}
public class DummyData
{
public long Ticks { get; set; }
public DateTime DateTime { get; set; }
public decimal Price { get; set; }
}
class Producer
{
private readonly Channel<DummyData> _tickQueue;
private readonly CancellationTokenSource _cancellationTokenSource;
public Producer(Channel<DummyData> tickQueue, CancellationTokenSource cancellationTokenSource)
{
_tickQueue = tickQueue;
_cancellationTokenSource = cancellationTokenSource;
}
public async Task StartProducing()
{
Random r = new Random();
for (int i = 0; i < 10; i++)
{
await _tickQueue.Writer.WriteAsync(new DummyData()
{
DateTime = DateTime.Now,
Ticks = DateTime.Now.Ticks,
Price = Convert.ToDecimal(r.NextDouble() * r.Next(100, 105))
}, _cancellationTokenSource.Token);
await Task.Delay(r.Next(50, 500));
}
}
}
internal class MultipleConsumersEg
{
private static Channel<DummyData> tickQueue;
private static readonly CancellationTokenSource TickQueueCancellationTokenSource = new CancellationTokenSource();
public static async Task Main(string[] args)
{
tickQueue = Channel.CreateUnbounded<DummyData>();
Producer p = new Producer(tickQueue, TickQueueCancellationTokenSource);
ConsumerOne consumerOne = new ConsumerOne(tickQueue, TickQueueCancellationTokenSource, "ONE");
consumerOne.StartConsuming();
p.StartProducing();
Console.ReadLine();
}
}
Above code snippets works for single producer/consumer, fiddle link. Now I would like to have another Consumer for different strategy (each consumer for one strategy).
class ConsumerTwo
{
private readonly Channel<DummyData> _tickQueue;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly string _tag;
public ConsumerTwo(Channel<DummyData> tickQueue, CancellationTokenSource cancellationTokenSource, string tag)
{
_tickQueue = tickQueue;
_cancellationTokenSource = cancellationTokenSource;
_tag = tag;
}
public async Task StartConsuming()
{
await foreach (var message in _tickQueue.Reader.ReadAllAsync(
cancellationToken: _cancellationTokenSource.Token))
{
// Business logic of Two
Console.WriteLine($"from consumer {_tag} ==> {message.Price}");
}
}
}
public static async Task Main(string[] args)
{
tickQueue = Channel.CreateUnbounded<DummyData>();
Producer p = new Producer(tickQueue, TickQueueCancellationTokenSource);
ConsumerOne consumerOne = new ConsumerOne(tickQueue, TickQueueCancellationTokenSource, "ONE");
consumerOne.StartConsuming();
ConsumerTwo consumerTwo = new ConsumerTwo(tickQueue, TickQueueCancellationTokenSource, "TWO");
consumerTwo.StartConsuming();
p.StartProducing();
Console.ReadLine();
}
After adding 2nd consumer it consumes data, but same data can't seen by two consumers. Here I want all consumers to receive all 10 messages. Considering at max I may have 50 consumers in future, all should receive same message.
Output:
from consumer TWO ==> 7.27597006121753
from consumer TWO ==> 30.4838315240171
from consumer TWO ==> 31.3675707908867
from consumer TWO ==> 53.2673930636206
from consumer ONE ==> 74.6396192795487
from consumer TWO ==> 24.2795471970634
from consumer ONE ==> 88.6467375550418
from consumer ONE ==> 26.3311568478758
from consumer TWO ==> 20.8731819843862
from consumer ONE ==> 0.85598795659704
All messages should receive both the consumers.