Seems like a job for BatchBlock
It is totally thread safe and is perfectly suited for data flows.
There are a lot of classes in the DataFlow .Net library, but the one that suits your situation is BatchBlock
.
BatchBlock
collects data until the size threshold is met. When it is met, the whole batch will be the result. You get the result in different ways like .Receive
or ReceiveAll
or their async counterparts. Another way is to link the batch result to another block like ActionBlock
which will asynchronously call the supplied Action
every time the input is supplied to it from the source block(BatchBlock in this case), so basically every time the batch gets full it is sent to the ActionBlock. ActionBlock
can receive a parameter like MaxDegreeOfParallelism
to avoid database lock or smth if you need that, but it will not block the BatchBlock
in any way so no waiting on the client side, the batches will be simply placed in a queue(thread safe) for ActionBlock
to execute.
And do not worry, when the batch gets full, it also doesn't stop to receive new items, so no blocking again. A beautiful solution.
One thing to worry about is that if the batch didn't reach full size, but you stop the application, the results will get lost, so you can TriggerBatch
manually to sent as much items to ActionBlock
as there is in the batch. So you can call TriggerBatch
in Dispose
or smth, up to you.
Also there are two ways of inputting items in the BatchBlock
: Post
and SendAsync
. Post
is blocking I believe (although I am not sure), but SendAsync
postpones the message if the BatchBlock
is busy.
class ConcurrentCache<T> : IAsyncDisposable {
private readonly BatchBlock<T> _batchBlock;
private readonly ActionBlock<T[]> _actionBlock;
private readonly IDisposable _linkedBlock;
public ConcurrentCache(int cacheSize) {
_batchBlock = new BatchBlock<T>(cacheSize);
// action to do when the batch max capacity is met
// the action can be an async task
_actionBlock = new ActionBlock<T[]>(ReadBatchBlock);
_linkedBlock = _batchBlock.LinkTo(_actionBlock);
}
public async Task SendAsync(T item) {
await _batchBlock.SendAsync(item);
}
private void ReadBatchBlock(T[] items) {
foreach (var item in items) {
Console.WriteLine(item);
}
}
public async ValueTask DisposeAsync() {
_batchBlock.Complete();
await _batchBlock.Completion;
_batchBlock.TriggerBatch();
_actionBlock.Complete();
await _actionBlock.Completion;
_linkedBlock.Dispose();
}
}
Usage example:
await using var cache = new ConcurrentCache<int>(5);
for (int i = 0; i < 12; i++) {
await cache.SendAsync(i);
await Task.Delay(200);
}
When the object will be disposed, the remaining batch will be triggered and printed.
UPDATE
As @TheodorZoulias pointed out, if the batch is not filled up and there are no messages for a long time, the messages would be stuck in the BatchBlock. The solution would be to create a timer to call .TriggerBatch()
.