I have an asynchronous sequence (stream) of messages that are arriving sometimes numerously and sometimes sporadically, and I would like to process them in batches of 10 messages per batch. I also want to enforce an upper limit to the latency between receiving a message and processing it, so a batch with fewer than 10 messages should also be processed, if 5 seconds have passed after receiving the first message of the batch. I found that I can solve the first part of the problem by using the Buffer
operator from the System.Interactive.Async package:
IAsyncEnumerable<Message> source = GetStreamOfMessages();
IAsyncEnumerable<IList<Message>> batches = source.Buffer(10);
await foreach (IList<Message> batch in batches)
{
// Process batch
}
The signature of the Buffer
operator:
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source, int count);
Unfortunately the Buffer
operator has no overload with a TimeSpan
parameter, so I can't solve the second part of the problem so easily. I'll have to implement somehow a batching operator with a timer myself. My question is: how can I implement a variant of the Buffer
operator that has the signature below?
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source, TimeSpan timeSpan, int count);
The timeSpan
parameter should affect the behavior of the Buffer
operator like so:
- A batch must be emitted when the
timeSpan
has elapsed after emitting the previous batch (or initially after the invocation of theBuffer
method). - An empty batch must be emitted if the
timeSpan
has elapsed after emitting the previous batch, and no messages have been received during this time. - Emitting batches more frequently than every
timeSpan
implies that the batches are full. Emitting a batch with less thancount
messages before thetimeSpan
has elapsed, is not desirable.
I am OK with adding external dependencies to my project if needed, like the System.Interactive.Async or the System.Linq.Async packages.
P.S. this question was inspired by a recent question related to channels and memory leaks.