1

I want to write an application where I'm reading items from a queue and processing them. Processing these items is expensive, so I want to do that in batches (of some specified amount). Moreover, I'd like to have a "timeout" mechanism that processes the items if some specified timespan elapses from the last processing, even if the batch is small.

Here's what I have at the moment:

public async Task Run(CancellationToken cancellationToken)
{
    var stream = GetItemsFromQueue(cancellationToken)
        .Buffer(100) // 100 items batches
        .WithCancellation(cancellationToken);
    
    await foreach (var elements in stream)
    {
        // expensive processing...
    }
}

private async IAsyncEnumerable<Item> GetItemsFromQueue([EnumeratorCancellation] CancellationToken cancellationToken)
{
    while (!cancellationToken.IsCancellationRequested)
    {
        yield return await _queue.DequeueAsync(cancellationToken);
    }
}

As you can see I managed to have batching up to some amount of items (100). However, I'm missing the "timeout" functionality. How could I add it?

As pointed out by @TheodorZoulias, it's worth to mention that I'm interested in emitting a batch X time (timeout) after receiving the first item in the batch, or earlier if the batch is full.

mnj
  • 2,539
  • 3
  • 29
  • 58
  • A `CancellationTokenSource` can be constructed with a `TimeSpan`, and then you can create a combined token of that token and your `cancellationToken`. Would that achieve what you want? – ProgrammingLlama Mar 29 '23 at 06:00
  • Related: [How to batch an IAsyncEnumerable, enforcing a maximum interval policy between consecutive batches?](https://stackoverflow.com/questions/67661709/how-to-batch-an-iasyncenumerablet-enforcing-a-maximum-interval-policy-between) – Theodor Zoulias Mar 29 '23 at 06:59
  • There are two different timeout policies one could think of: 1) Emit a batch every X time, even if the batch is empty, or earlier if the batch is full. 2) Emit a batch X time after receiving the first item in the batch, or earlier if the batch is full. The first policy emits buffers more frequently. The second policy ensures that no item will be buffered for more than X time. Do you have any preference between these two policies? – Theodor Zoulias Mar 29 '23 at 07:07
  • @TheodorZoulias I definitely prefer the second approach. Thanks for the example, the one thing that I don't like about it is how imperative it is. I was wondering if it could be implemented using some existing libraries/code. I'll have a look at rx.net to see if something useful exists there. – mnj Mar 29 '23 at 09:55
  • There is an implementation of the policy (2) on [this question](https://stackoverflow.com/questions/72313288/how-to-batch-a-channelreadert-enforcing-a-maximum-interval-policy-between-con "How to batch a ChannelReader, enforcing a maximum interval policy between consuming and processing any individual item?"), but it targets the `ChannelReader` abstract class, not the `IAsyncEnumerable` interface. I am not aware of any NuGet package that offers this functionality out of the box. You could consider editing the question, and specifying that the policy (2) is the policy that you prefer. – Theodor Zoulias Mar 29 '23 at 10:35
  • @mnj what is `_queue`? Reactive Extensions already contains buffering extensions like the one you want. The [sliding window block](https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/walkthrough-creating-a-custom-dataflow-block-type) example shows another possibility - collect incoming items and emit them when you either get enough items or a timeout. If you use a Channel it's easier and *definitely* doesn't require complex code – Panagiotis Kanavos Mar 29 '23 at 11:21
  • 1
    As a side note, instead of `while (!cancellationToken.IsCancellationRequested)` it's better to do `while (true) { cancellationToken.ThrowIfCancellationRequested();`. Cancellation in .NET is propagated as an exception. – Theodor Zoulias Mar 29 '23 at 11:36
  • Are you using Visual Studio's [AsyncQueue](https://learn.microsoft.com/en-us/dotnet/api/microsoft.visualstudio.threading.asyncqueue-1?view=visualstudiosdk-2022)? .NET itself has no class with `DequeueAsync`. The general-purpose asynchronous collection is `Channel`. You can use your code and System.Linq.Async's `ToObservable` so you can use Rx's `Buffer` with a timespan, eg `GetItems(..).ToObservable().Buffer(100,TimeSpan.FromSeconds(100))`. Or you could modify System.Linq.Async's `Buffer` to emit the data it gathered when a timer fires, the same way Rx's buffer does – Panagiotis Kanavos Mar 29 '23 at 11:37
  • @PanagiotisKanavos Right, I also just found out about Buffer in RxNet in https://www.fuget.org/packages/System.Reactive/5.0.0/lib/netstandard2.0/System.Reactive.dll/System.Reactive.Linq/Observable. I'm going to try to use it. The queue I use is just my own interface – mnj Mar 29 '23 at 12:48

0 Answers0