Suppose I am provided with an event producer API consisting of Start()
, Pause()
, and Resume()
methods, and an ItemAvailable
event. The producer itself is external code, and I have no control over its threading. A few items may still come through after Pause()
is called (the producer is actually remote, so items may already be in flight over the network).
Suppose also that I am writing consumer code, where consumption may be slower than production.
Critical requirements are
- The consumer event handler must not block the producer thread, and
- All events must be processed (no data can be dropped).
I introduce a buffer into the consumer to smooth out some burstiness. But in the case of extended burstiness, I want to call Producer.Pause()
, and then Resume()
at an appropriate time, to avoid running out of memory at the consumer side.
I have a solution making use of Interlocked
to increment and decrement a counter, which is compared to a threshold to decide whether it is time to Pause
or Resume
.
Question: Is there a better solution than the Interlocked
counter (int current
in the code below), in terms of efficiency (and elegance)?
Updated MVP (no longer bounces off the limiter):
namespace Experiments
{
internal class Program
{
// simple external producer API for demo purposes
private class Producer
{
public void Pause(int i) { _blocker.Reset(); Console.WriteLine($"paused at {i}"); }
public void Resume(int i) { _blocker.Set(); Console.WriteLine($"resumed at {i}"); }
public async Task Start()
{
await Task.Run
(
() =>
{
for (int i = 0; i < 10000; i++)
{
_blocker.Wait();
ItemAvailable?.Invoke(this, i);
}
}
);
}
public event EventHandler<int> ItemAvailable;
private ManualResetEventSlim _blocker = new(true);
}
private static async Task Main(string[] args)
{
var p = new Producer();
var buffer = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleWriter = true });
int threshold = 1000;
int resumeAt = 10;
int current = 0;
int paused = 0;
p.ItemAvailable += (_, i) =>
{
if (Interlocked.Increment(ref current) >= threshold
&& Interlocked.CompareExchange(ref paused, 0, 1) == 0
) p.Pause(i);
buffer.Writer.TryWrite(i);
};
var processor = Task.Run
(
async () =>
{
await foreach (int i in buffer.Reader.ReadAllAsync())
{
Console.WriteLine($"processing {i}");
await Task.Delay(10);
if
(
Interlocked.Decrement(ref current) < resumeAt
&& Interlocked.CompareExchange(ref paused, 1, 0) == 1
) p.Resume(i);
}
}
);
p.Start();
await processor;
}
}
}