1

I'm working on project with a background service consume messages from Rabbitmq's queue.

I have a background service use background task queue like this and here to process task paralleling.

I would like to store in buffer messages consumed and group them by id and send them in another RabbitMq queue after an specified interval or size of buffer.

I already post question here, and the solution will be to implement Channel<T>.

But a worker can't produce and consume in same thread.

So I thought about to use 2 workers, one to consume messages from RabbitMq and store them in Channel (Write) and another worker that read Channel and group items to send them to another queue.

Is it the best way to achieve this ?

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Julien Martin
  • 197
  • 2
  • 15
  • "and send them in another RabbitMq queue after an specified interval or size of buffer", why don't you just use Rx.Net or TPL Dataflow? With Rx, you simply use the `IObservable.Buffer(TimeSpan, int)` method to transform your input into an `IObservable>`. – Aron Jul 06 '22 at 12:27
  • A BackgroundService is not a worker, it's just a class. It's not used for parallel operations, it's used to execute something in the background. You can start multiple tasks inside that class with `Task.Run` if you have to. One method could read messages from RabbitMq and post them somewhere and the other could process them. That somewhere could be a Channel, or it could be a Dataflow pipeline – Panagiotis Kanavos Jul 06 '22 at 12:28
  • What does your code look like? What kind of processing to you want to perform? `Channel` is a great asynchronous queue and *doesn't* have memory leaks, but it's just a queue. A DataFlow ActionBlock though combines both an input queue and workers. Multiple blocks like TransformBlock can be linked into a pipeline. DataFlow classes were built to process streams of events or messages, which seems to be what you want here – Panagiotis Kanavos Jul 06 '22 at 12:30
  • 1
    Actually reading your previous question, its sounds like you actually want to do something Rx or Dataflow, instead of creating your own framework. These are solved problems. – Aron Jul 06 '22 at 12:31
  • @PanagiotisKanavos *"Channel is a great asynchronous queue and doesn't have memory leaks, but it's just a queue."* -- It seems that you have changed your mind since Nov 2019, when [you wrote](https://stackoverflow.com/questions/58807485/is-it-possible-and-or-advisable-to-use-multiple-system-threading-channels-in-one/58954367#58954367): *"Channels are not just async queues."* You might want to update that older answer with your current perspective. – Theodor Zoulias Jul 06 '22 at 12:59
  • @PanagiotisKanavos _What kind of processing to you want to perform?_ I want to group consumed messages from RabbitMq by id and send them to another queue. Example : Id : 1 value : "AA", Id : 2 value : "BB", Id : 1 value : "CC" and after group : Id : 1, Value "AA, CC", Id 2 : Value "BB" – Julien Martin Jul 06 '22 at 13:05
  • @JulienMartin in an "infinite" stream of messages you can't just group by a property. You can never know whether you've received all messages with `Id=1`. You'll have to specify a limit or end condition, eg X items received, or T period elapsed, or `ID=1 and Status=End`. It's easier if you want to simply route to a different queue by some property – Panagiotis Kanavos Jul 06 '22 at 14:01
  • @PanagiotisKanavos yes you are right with you about _You'll have to specify a limit or end condition, eg X items received, or T period elapsed_ , it's planned to specify a limit (eg :100 messages or X periode elapsed). Solution will be have another task in my brackground which will use TransformationBlock for example ? – Julien Martin Jul 06 '22 at 14:09
  • 1
    @JulienMartin there's a BatchBlock that buffers N items and emits an array once it receives enough items. Rx.NET offers buffering based on both count and timeout. – Panagiotis Kanavos Jul 06 '22 at 14:11

2 Answers2

0

Things are far simpler. And there's no leak, unless you seriously misuse Channels.

A BackgroundService or an IHostedService isn't a worker or Task, it's a class created by the Host when it starts. The host calls StartAsync on it when the application starts, and StopAsync at the end. The BackgroundService base class simplifies things a a bit so you can put all your code in an overridden ExecuteAsync without having to handle Start and Stop. You can start as many tasks as you want inside StartAsync or ExecuteAsync. You don't need to create multiple background/hosted services.

In a hosted service the StartAsync(CancellationToken) can create your client, connect to RabbitMq, and register an event handler to post to whatever you use to process the messages. .NET offers multiple ways to process streams of events , both :

EventingBasicConsumer? _consumer;

void OnReceived(object sender,BasicDeliverEventArgs ea)
{
   var body = ea.Body.ToArray();
   //Process that message
}

protected override async Task StartAsync(CancellationToken stoppingToken)
{

   _consumer=StartRabbitAndCreateConsumer();
   _consumer.Received += OnReceived;
}

protected override async Task StopAsync(CancellationToken stoppingToken)
{
    _consumer.Received -= OnReceived;
    _consumer.Dispose();
}

public void Dispose()
{
    CleanupRabbitMqEtc();
}

There are many ways to construct processing pipelines to process these events

  • Channels are "just" a thread-safe, asynchronous (ie non-blocking), order-preserving, multi-writer, multi-reader, bounded or unbounded queue. With configurable overflow behavior (eg, discard the oldest or newest message). They're quite similar to Go's channels and used in similar ways to construct pipelines of processing tasks. You have to write the code that reads from a channel, processes the message and writes to the next one, typically using tasks.
  • The Dataflow library is a higher level library that provides blocks that incorporate both input/output buffers and processing. Blocks use worker tasks to process messages from their input buffers and publish them to their output buffers. Blocks can be linked to construct a processing pipeline or mesh
  • Reactive Extensions are specifically built for event stream processing using LINQ-like operators over IObservable<> sources, the same way LINQ works over IEnumerable. They can be used to create pipelines too, but contain both message based and time-based operators.

With Rx.NET, grouping by key, buffering by count and timeout could be done with a query similar to this one, borrowed from this question

eventSource
    .GroupBy(e => e.GroupingKey)
    .SelectMany(group => group.Buffer(TimeSpan.FromSeconds(60), 100))
    .Subscribe(list => SendToStorage(list));

Since you care about the grouping key (the ID) the query could be adjusted to

eventSource
    .GroupBy(e => e.ID)
    .SelectMany(group => new {
                         ID=group.Key,
                         Items=group.Buffer(TimeSpan.FromSeconds(60)), 100)
               })
    .Subscribe(batch => SendToQueue(batch.ID,batch.Items));

A gotcha with Rx.NET is that it's optimized for event stream processing unlike the other options, so it's single threaded by default! You can specify that the observable, the subscriptions and even individual steps will run in different threads though. .SubscribeOn(Scheduler.ThreadPool) will run the subscription on threadpool threads

eventSource
    .GroupBy(e => e.ID)
    .SelectMany(group => new {
                         ID=group.Key,
                         Items=group.Buffer(TimeSpan.FromSeconds(60)), 100)
               })
    .SubcribeOn(Scheduler.ThreadPool)
    .Subscribe(batch => SendToQueue(batch.ID,batch.Items));

Creating an Observable from an event is already supported but someone asked for explicit Rx.NET support in RabbitMQ and even provided a source. Unfortunately, the request was rejected because some of the commenters wanted Rx.NET operators, others wanted Dataflow operators, so it was decided to not include a specific implementation.

Using the implementation in that issue:

public IObservable<DisposableValue<T>> Receive<T>(string exchangeName, string routingKey)
{
    if (exchangeName == null)
    {
        throw new ArgumentNullException(nameof(exchangeName));
    }

    if (routingKey == null)
    {
        throw new ArgumentNullException(nameof(routingKey));
    }

    var queueName = this.model.QueueDeclare().QueueName;
    this.model.QueueBind(queueName, exchangeName, routingKey);

    using (var subscription = new Subscription(model, queueName, false))
    {
        var consumer = new EventingBasicConsumer(this.model);

        var observable = Observable
            .FromEventPattern<BasicDeliverEventArgs>(
                x => consumer.Received += x,
                x => consumer.Received -= x)
            .Select(x => new DisposableValue<T>(
                Deserialize<T>(x.EventArgs.Body),
                () => this.model.BasicAck(x.EventArgs.DeliveryTag, false)));

        this.model.BasicConsume(queueName, false, consumer);

        return observable;
    }
}

public class DisposableValue<T> : IDisposable
{
    private readonly Action disposeAction;

    public DisposableValue(T value, Action disposeAction)
    {
        this.disposeAction = this.disposeAction ??
            throw new ArgumentNullException(nameof(DisposableValue<T>.disposeAction));
        this.Value = value;
    }

    public T Value { get; }

    public void Dispose() => this.disposeAction();
}

You could create an Rx.NET subscription in StartAsync and dispose it in StopAsync :

protected override async Task StartAsync(CancellationToken stoppingToken)
{
    _sub=Receive<Foo>("Exchange", "RoutingKey")
            .GroupBy(e => e.ID)
            .SelectMany(group => new {
                         ID=group.Key,
                         Items=group.Buffer(TimeSpan.FromSeconds(60)), 100)
               })
            .SubcribeOn(Scheduler.ThreadPool)
            .Subscribe(batch => SendToQueue(batch.ID,batch.Items));
}

protected override async Task StopAsync(CancellationToken stoppingToken)
{
    _sub.Dispose();
}
Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
  • Thanks you for explanation ! I just implemented BatchBlock for know how it's works, I'm going to read again your answer and implement Rx.Net. I will come back to you to tell you if I was able to succeed. – Julien Martin Jul 06 '22 at 15:58
  • 1
    @JulienMartin I actually started writing an answer for BatchBlock before you mentioned time batching. Then I found the linked question which answered your question exactly, so I decided to post about Rx.NET first. – Panagiotis Kanavos Jul 07 '22 at 07:13
  • 1
    about BatchBlock, we can use `TriggerBatch` to define a timeout to initiate a batching operation if the number of items is less than `BatchSize`. I implemented Rx.Net and with your solution I can group my data by id. – Julien Martin Jul 20 '22 at 14:28
  • Didn't know about this. This was introduced in .NET Core and I missed it. – Panagiotis Kanavos Jul 20 '22 at 14:34
0

It seems that you are searching for a LINQ operator like this one:

/// <summary>
/// Groups the elements of an asynchronous sequence according to a specified
/// key selector function and comparer. The groupTimeSpan and groupCount parameters
/// are used to control the lifetime of groups. A group is emitted when the
/// specified time-span has elapsed after receiving the first element of the group,
/// or when a group contains the specified number of elements. Multiple groups
/// with the same key can be emitted by the resulting sequence.
/// </summary>
public static IAsyncEnumerable<IGrouping<TKey, TSource>>
    GroupByUntil<TSource, TKey>(
        this IAsyncEnumerable<TSource> source,
        Func<TSource, TKey> keySelector,
        TimeSpan groupTimeSpan,
        int groupCount);

You could then use it to consume a Channel<Message> in groups of messages with the same Id like this:

var groupedMessages = channel.Reader.ReadAllAsync()
    .GroupByUntil(msg => msg.Id, TimeSpan.FromSeconds(15), groupCount: 50);
await foreach (IGrouping<int, Message> messages in groupedMessages)
{
    // Process group of messages
}

The problem is that the GroupByUntil operator for asynchronous enumerable sequences does not exist. The System.Linq.Async package contains functionality that is nowhere near this level of sophistication. That's something that you'd expect to find in the System.Reactive package instead. This is the project that reached nearly 100% maturity, before the whole repository stopped evolving for unknown reasons a couple of years ago. Currently a GroupByUntil operator do exist for observable sequences (System.Reactive), with this signature:

// Groups the elements of an observable sequence according to a specified key
// selector function and comparer. A duration selector function is used to control
// the lifetime of groups. When a group expires, it receives an OnCompleted
// notification. When a new element with the same key value as a reclaimed group
// occurs, the group will be reborn with a new lifetime request.
public static IObservable<IGroupedObservable<TKey, TSource>>
    GroupByUntil<TSource, TKey, TDuration>(
    this IObservable<TSource> source,
    Func<TSource, TKey> keySelector,
    Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector);

It is tempting to use this operator, in combination with the ToObservable/ToAsyncEnumerable converters (System.Linq.Async), in order to implement the desirable operator, but there is a problem. Let's first see the implementation, and talk about the problem later:

// Caution: Hidden queue
public static IAsyncEnumerable<IGrouping<TKey, TSource>>
    GroupByUntil<TSource, TKey>(
        this IAsyncEnumerable<TSource> source,
        Func<TSource, TKey> keySelector,
        TimeSpan groupTimeSpan,
        int groupCount,
        IEqualityComparer<TKey> keyComparer = null)
{
    ArgumentNullException.ThrowIfNull(source);
    ArgumentNullException.ThrowIfNull(keySelector);
    if (groupTimeSpan < TimeSpan.Zero)
        throw new ArgumentOutOfRangeException(nameof(groupTimeSpan));
    if (groupCount < 1) throw new ArgumentOutOfRangeException(nameof(groupCount));
    keyComparer ??= EqualityComparer<TKey>.Default;
    return source
        .ToObservable()
        .GroupByUntil(keySelector, g => g.Skip(groupCount - 1).Select(_ => 0L)
            .Amb(Observable.Timer(groupTimeSpan)), keyComparer)
        .SelectMany(g => g.ToList().Select(x => x.GroupBy(_ => g.Key, keyComparer)))
        .ToAsyncEnumerable();
}

The problem is that the enumeration of the source sequence is not driven by the enumeration of the resulting sequence. Instead, the source sequence is enumerated by a background consumer at the maximum speed possible, and the consumed messages are buffered in a hidden queue. You have no control regarding the size of this queue, and you have no way to know its current size (at least not directly). This is what happens with an enumerable sequence, when you attach the ToObservable converter to it.

How big of a problem is this? It depends on the total number of messages contained in the source sequence, on the frequency that these messages are emitted, and on the amount of work that the consumer of the resulting sequence has to do after consuming each group. In the extreme case of an infinite sequence that emits at a faster pace than the pace of the consumer, the hidden queue will grow larger and larger, the latency between consuming a message from the source and processing it will get longer and longer, and eventually, sooner or later the application will crash with an OutOfMemoryException.

Unfortunately implementing properly the GroupByUntil operator is not trivial. You can see here an example of what it takes for a proper implementation of a LINQ operator for IAsyncEnumerable<T> sequences (a Buffer operator in that case). I won't attempt to implement the GroupByUntil here because it's a major undertaking, and you might not need it. Maybe your scenario is not affected too much by the shortcomings of the ToObservable/ToAsyncEnumerable implementation. In case it does, you might try to implement it yourself, and if you get stuck you could post a new question about your troubles, and we might be able to help.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 1
    Many thanks for taking the time to answer my initial question. I have not had time this weekend, but I begin to implement Rx.Net to solve my problem. If I have any troubles, I will come back to inform you. Thanks again. – Julien Martin Jul 08 '22 at 14:42
  • The [reactive repository](https://github.com/dotnet/reactive) stopped evolving for a couple of years ago, but community is still active not ? – Julien Martin Jul 20 '22 at 14:32
  • @JulienMartin it's less active than it was before. – Theodor Zoulias Jul 20 '22 at 15:15
  • @JulienMartin regarding the current state of the dotnet/reactive repository: [Are these libraries still under development?](https://github.com/dotnet/reactive/issues/1822) – Theodor Zoulias Oct 22 '22 at 09:24