I want to write an application where I'm reading items from a queue and processing them. Processing these items is expensive, so I want to do that in batches (of some specified amount). Moreover, I'd like to have a "timeout" mechanism that processes the items if some specified timespan elapses from the last processing, even if the batch is small.
Here's what I have at the moment:
public async Task Run(CancellationToken cancellationToken)
{
var stream = GetItemsFromQueue(cancellationToken)
.Buffer(100) // 100 items batches
.WithCancellation(cancellationToken);
await foreach (var elements in stream)
{
// expensive processing...
}
}
private async IAsyncEnumerable<Item> GetItemsFromQueue([EnumeratorCancellation] CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
yield return await _queue.DequeueAsync(cancellationToken);
}
}
As you can see I managed to have batching up to some amount of items (100). However, I'm missing the "timeout" functionality. How could I add it?
As pointed out by @TheodorZoulias, it's worth to mention that I'm interested in emitting a batch X time (timeout) after receiving the first item in the batch, or earlier if the batch is full.