0

Suppose I am provided with an event producer API consisting of Start(), Pause(), and Resume() methods, and an ItemAvailable event. The producer itself is external code, and I have no control over its threading. A few items may still come through after Pause() is called (the producer is actually remote, so items may already be in flight over the network).

Suppose also that I am writing consumer code, where consumption may be slower than production.

Critical requirements are

  1. The consumer event handler must not block the producer thread, and
  2. All events must be processed (no data can be dropped).

I introduce a buffer into the consumer to smooth out some burstiness. But in the case of extended burstiness, I want to call Producer.Pause(), and then Resume() at an appropriate time, to avoid running out of memory at the consumer side.

I have a solution making use of Interlocked to increment and decrement a counter, which is compared to a threshold to decide whether it is time to Pause or Resume.

Question: Is there a better solution than the Interlocked counter (int current in the code below), in terms of efficiency (and elegance)?

Updated MVP (no longer bounces off the limiter):

namespace Experiments
{
    internal class Program
    {
        // simple external producer API for demo purposes
        private class Producer
        {
            public void Pause(int i) { _blocker.Reset(); Console.WriteLine($"paused at {i}"); }
            public void Resume(int i) { _blocker.Set(); Console.WriteLine($"resumed  at {i}"); }
            public async Task Start()
            {
                await Task.Run
                (
                    () =>
                    {
                        for (int i = 0; i < 10000; i++)
                        {
                            _blocker.Wait();
                            ItemAvailable?.Invoke(this, i);
                        }
                    }
                );
            }

            public event EventHandler<int> ItemAvailable;
            private ManualResetEventSlim _blocker = new(true);
        }

        private static async Task Main(string[] args)
        {
            var p = new Producer();
            var buffer = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleWriter = true });
            int threshold = 1000;
            int resumeAt = 10;
            int current = 0;
            int paused = 0;

            p.ItemAvailable += (_, i) =>
            {
                if (Interlocked.Increment(ref current) >= threshold
                    && Interlocked.CompareExchange(ref paused, 0, 1) == 0
                ) p.Pause(i);

                buffer.Writer.TryWrite(i);
            };

            var processor = Task.Run
            (
                async () =>
                {
                    await foreach (int i in buffer.Reader.ReadAllAsync())
                    {
                        Console.WriteLine($"processing {i}");
                        await Task.Delay(10);
                        if
                        (
                            Interlocked.Decrement(ref current) < resumeAt
                            && Interlocked.CompareExchange(ref paused, 1, 0) == 1
                        ) p.Resume(i);
                    }
                }
            );

            p.Start();
            await processor;
        }
    }
}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
allmhuran
  • 4,154
  • 1
  • 8
  • 27
  • so it sounds very much like the remote 'producer' is actually a queue since you have an interface to pause/start meaning it will retain items for some time. given that, i'd write code to start() and then `pause()` immediately upon receive in buffer; then in the buffer processing i would only `Resume()` when the buffer is empty. Then, i would run X number of programs (not sure re: your deployment environment etc.) which would guarantee the most isolation (e.g. under your design where 'all data must be processed' running it in one app you have a risk of the app crashing and `...continued` – zaitsman Sep 22 '21 at 06:15
  • taking your buffer down with it saying bye-bye to all messages downloaded but unprocessed, plus you process messages in a `foreach` loop (meaning they are not processed in parallel). Going with multiple consumers would allow to alleviate all of that (and also horizontally scale) – zaitsman Sep 22 '21 at 06:16
  • 1
    @zaitsman Yes, that's about right. The producer is an event broker that will store messages until a configured TTL elapses. If the TTL elapses messages are dropped, but that would only happen in the case of an absurdly slow consumer, so that's acceptable. I did consider completely filling and draining the buffer, but that would mean that there is some time during which the buffer is empty and we are waiting for data to arrive, which introduces a periodic latency. Not horrible, but not optimal. In the real scenario the consumer must ack, if they crash then the broker retransmits on reconnect. – allmhuran Sep 22 '21 at 06:23
  • 1
    That latency would be alleviated by all the other instances of your consumer running (e.g. say you run 50 docker containers or windows services or console apps whatever they are, if 10 are draining buffer while waiting to reconnect another 40 are receiving). Naturally if you want to keep to a single instance my suggestion won't be an improvement. – zaitsman Sep 22 '21 at 06:27
  • @zaitsman Right, I should have mentioned that this is a single-consumer-per-producer-queue secenario (it is a pub sub broker, but a consumer has a dedicated queue). I could spin up multiple local consumer instances regardless, but then that requires more work reassembling message order, etc. – allmhuran Sep 22 '21 at 06:37
  • To be honest, within that limitation i don't know that you will find something that much better than what you've come up with. – zaitsman Sep 22 '21 at 06:53
  • @allmhuran you can do what you want with channels if you realize you have three parts in the problem: feeding events to a channel, calling `pause()` if a bounded channel is full and calling `resume()` if a channel has fewer than N pending items. It's easier to write and maintain simple steps with minimal coupling. There's no need for complex code or forcing workers to coordinate with each other through synchroziation primitives like Interlocked, semaphores or `Progress`-used-as-AutoResetEvent – Panagiotis Kanavos Sep 22 '21 at 15:13
  • @PanagiotisKanavos it's not quite that simple. We can't call `Pause` if the channel is full, because if more messages dribble in after `Pause` is called, there's nowhere for them to go. I toyed around with having an overflow channel and switching the `ChannelWriter` back and forth, but that doesn't make things any easier. – allmhuran Sep 22 '21 at 15:48
  • @PanagiotisKanavos Ah, I see you've provided an answer which does account for this, I have not looked at it in detail yet. – allmhuran Sep 22 '21 at 15:55
  • 1
    @allmhuran I can't stress enough how important it is to keep steps/blocks simple. You aren't dealing with single calls any more, there's no call stack, so maintenance and debugging can be a *real* pain. I got bitten by this several times. In .NET Dataflow and Channels aren't well known so there aren't many good books, but in Go they're used everywhere. It would be a good idea to study the [Go concurrency patterns](https://go.dev/blog/pipelines) especially around composition, cancellation and error handling. – Panagiotis Kanavos Sep 22 '21 at 15:59

2 Answers2

1

If you are aiming at elegance, you could consider baking the pressure-awareness functionality inside a custom Channel<T>. Below is a PressureAwareUnboundedChannel<T> class that derives from the Channel<T>. It offers all the functionality of the base class, plus it emits notifications when the channel becomes under pressure, and when the pressure is relieved. The notifications are pushed through an IProgress<bool> instance, that emits a true value when the pressure surpasses a specific high-threshold, and a false value when the pressure drops under a specific low-threshold.

public sealed class PressureAwareUnboundedChannel<T> : Channel<T>
{
    private readonly Channel<T> _channel;
    private readonly int _highPressureThreshold;
    private readonly int _lowPressureThreshold;
    private readonly IProgress<bool> _pressureProgress;
    private int _pressureState = 0; // 0: no pressure, 1: under pressure

    public PressureAwareUnboundedChannel(int lowPressureThreshold,
        int highPressureThreshold, IProgress<bool> pressureProgress)
    {
        if (lowPressureThreshold < 0)
            throw new ArgumentOutOfRangeException(nameof(lowPressureThreshold));
        if (highPressureThreshold < lowPressureThreshold)
            throw new ArgumentOutOfRangeException(nameof(highPressureThreshold));
        if (pressureProgress == null)
            throw new ArgumentNullException(nameof(pressureProgress));
        _highPressureThreshold = highPressureThreshold;
        _lowPressureThreshold = lowPressureThreshold;
        _pressureProgress = pressureProgress;
        _channel = Channel.CreateBounded<T>(Int32.MaxValue);
        this.Writer = new ChannelWriter(this);
        this.Reader = new ChannelReader(this);
    }

    private class ChannelWriter : ChannelWriter<T>
    {
        private readonly PressureAwareUnboundedChannel<T> _parent;

        public ChannelWriter(PressureAwareUnboundedChannel<T> parent)
            => _parent = parent;
        public override bool TryComplete(Exception error = null)
            => _parent._channel.Writer.TryComplete(error);
        public override bool TryWrite(T item)
        {
            bool success = _parent._channel.Writer.TryWrite(item);
            if (success) _parent.SignalWriteOrRead();
            return success;
        }
        public override ValueTask<bool> WaitToWriteAsync(
            CancellationToken cancellationToken = default)
                => _parent._channel.Writer.WaitToWriteAsync(cancellationToken);
    }

    private class ChannelReader : ChannelReader<T>
    {
        private readonly PressureAwareUnboundedChannel<T> _parent;

        public ChannelReader(PressureAwareUnboundedChannel<T> parent)
            => _parent = parent;
        public override Task Completion => _parent._channel.Reader.Completion;
        public override bool CanCount => _parent._channel.Reader.CanCount;
        public override int Count => _parent._channel.Reader.Count;
        public override bool TryRead(out T item)
        {
            bool success = _parent._channel.Reader.TryRead(out item);
            if (success) _parent.SignalWriteOrRead();
            return success;
        }
        public override ValueTask<bool> WaitToReadAsync(
            CancellationToken cancellationToken = default)
                => _parent._channel.Reader.WaitToReadAsync(cancellationToken);
    }

    private void SignalWriteOrRead()
    {
        var currentCount = _channel.Reader.Count;
        bool underPressure;
        if (currentCount > _highPressureThreshold)
            underPressure = true;
        else if (currentCount <= _lowPressureThreshold)
            underPressure = false;
        else
            return;
        int newState = underPressure ? 1 : 0;
        int oldState = underPressure ? 0 : 1;
        if (Interlocked.CompareExchange(
            ref _pressureState, newState, oldState) != oldState) return;
        _pressureProgress.Report(underPressure);
    }
}

The encapsulated Channel<T> is actually a bounded channel, having capacity equal to the maximum Int32 value, because only bounded channels implement the Reader.Count property.¹

Usage example:

var progress = new Progress<bool>(underPressure =>
{
    if (underPressure) Producer.Pause(); else Producer.Resume();
});
var channel = new PressureAwareUnboundedChannel<Item>(500, 1000, progress);

In this example the Producer will be paused when the items stored inside the channel become more than 1000, and it will be resumed when the number of items drops to 500 or less.

The Progress<bool> action is invoked on the context that was captured at the time of the Progress<bool>'s creation. So if you create it on the UI thread of a GUI application, the action will be invoked on the UI thread, otherwise in will be invoked on the ThreadPool. In the later case there will be no protection against overlapping invocations of the Action<bool>. If the Producer class is not thread-safe, you'll have to add synchronization inside the handler. Example:

var progress = new Progress<bool>(underPressure =>
{
    lock (Producer) if (underPressure) Producer.Pause(); else Producer.Resume();
});

¹ Actually unbounded channels also support the Count property, unless they are configured with the SingleReader option.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 1
    Ingenious. You're a bit scary Theodor :) I actually had not noticed that a `ChannelReader` had a `Count` on the bounded version. I went off to see if that was an O(1) count - I recall some discussion about the performance of `Count` on another .net collection - which took me down a merry path to a `Dequeue` in the channel source... which might be based on Stephen Cleary's double ended queue? In which case it is. So that alone makes my life easier. But putting the pressure awareness directly into a derived channel is inspired! – allmhuran Sep 22 '21 at 13:42
  • 1
    @allmhuran I am now having second thoughts regarding the thread-safety of the `PressureAwareUnboundedChannel` class. If the high and low thresholds are too close, it is possible that two threads that write and read to the channel may invoke the `_pressureProgress.Report` in the wrong order. So a sequence Pause-Resume may be reversed to Resume-Pause, resulting to the channel becoming eventually empty, without resuming the `Producer`. The only solution I can think of is to invoke the notifications synchronously under a `lock` (scraping the `IProgress` approach). – Theodor Zoulias Sep 22 '21 at 16:04
  • 1
    I was struggling with that same problem when I attempted to design a solution that added an overflow channel and switched a `ChannelWriter` back and forth. I have been playing with the code and while I did end up going with a simple `lock` in the handler itself, the simple heads up about a countable bounded reader was key info, and I think I could still move the control out of the handler and into a derived channel, inspired by your approach here, if I really wanted to do that. So I'm leaving my vote and accept in place! – allmhuran Sep 22 '21 at 16:11
  • @allmhuran I rolled back to the initial answer, because although the second revision was a step in the right direction, it was overly complicated without solving all the underlying issues. It solved the problem of the ownership of the `Producer` (the initial implementation offers no way to await until all event-handling code has completed), but it didn't solve the problem of the notifications coming faster than the `Producer` can respond to them. Also the `eventTaskScheduler` is a failed idea. As I figured out today, it is just impossible to throw an exception directly on a `TaskScheduler`. – Theodor Zoulias Sep 24 '21 at 00:57
  • My current opinion about the optimal API for the `PressureAwareUnboundedChannel` class is shown in [this](https://gist.github.com/theodorzoulias/715a19e0dc69bd23143826e23d826a83) gist. I replaced the `UnderPressure` event with a `SubscribeForPressureNotifications` method, that returns an `ISubscription` object. This object has only one method, the `UnsubscribeAsync`, and awaiting it guarantees that no more notifications will be emitted. Any error thrown by the subscription code is surfaced on this await. Also multiple subscribers are notified concurrently. – Theodor Zoulias Sep 24 '21 at 00:57
  • Well well, this one certainly latched itself onto your brain pretty tightly! It's a gherkin served on a bed of fois gras... quite the sophisticated pickle. I think the logic of the implementation is getting beyond my ability to evaluate without a a couple of of hours and a coffee drip. :D – allmhuran Sep 24 '21 at 03:43
  • 1
    I learn a lot either way so I am always happy to read your experiments :) – allmhuran Sep 24 '21 at 08:10
1

This is relatively straightforward if you realize there are three "steps" in this problem.

  1. The first step ToChannel(Producer) receives messages from the producer.
  2. The next step, PauseAt signals pause() if there are too many pending items in the out panel.
  3. The third step, ResumeAt signals resume() if its input channel has a count less than a threshold.

It's easy to combine all three steps using typical Channel patterns.


producer.ToChannel(token)
    .PauseAt(1000,()=>producer.PauseAsync(),token)
    .ResumeAt(10,()=>producer.ResumeAsync(),token)
    ....

Or a single, generic TrafficJam method:

static ChannelReader<T> TrafficJam(this ChannelReader<T> source,
    int pauseAt,int resumeAt,
    Func<Task> pause,Func<Task> resume,
    CancellationToken token=default)
{
    return source
             .PauseAt(pauseAt,pause,token)
             .ResumeAt(resumeAt,resume,token);
}

ToChannel

The first step is relatively straightforward, an unbounded Channel source based from the producer's events.

static ChannelReader<int> ToChannel(this Producer producer,
                                    CancellationToken token=default)
{
    Channel<int> channel=Channel.CreateUnbounded();
    var writer=channel.Writer;
    producer.ItemAvailable += OnItem;
    return channel;

    void OnItem(object sender, int item)
    {
        writer.TryWriteAsync(item);
        if(token.IsCancellationRequested)
        {
            producer.ItemAvailable-=OnItem;
            writer.Complete();
            
        }
    }
}

The only unusual part is using a local function to allow disabling the event handler and completing the output channel when cancellation is requested

That's enough to queue all the incoming items. ToChannel doesn't bother with starting, pausing etc, that's not its job.

PauseAt

The next function, PauseAt, uses a BoundedChannel to implement the threshold. It forwards incoming messages if it can. If the channel can't accept any more messages it calls the pause callback and awaits until it can resume forwarding :

static ChannelReader<T> PauseAt(this ChannelReader<T> source, 
        int threshold, Func<Task> pause,
        CancellationToken token=default)
{
    Channel<T> channel=Channel.CreateBounded(threshold);
    var writer=channel.Writer;

    _ = Task.Run(async ()=>
        await foreach(var msg in source.ReadAllAsync(token))
        {
            if(writer.CanWrite())
            {
               await writer.WriteAsync(msg);
            }
            else
            {
               await pause();
               //Wait until we can post again
               await writer.WriteAsync(msg);
            }
        }
    },token)
    .ContinueWith(t=>writer.TryComplete(t.Exception));

    return channel;
}

ResumeAt

The final step, ResumeAt, calls resume() if its input was previously above the threshold and now has fewer items.

If the input isn't bounded, it just forwards all messages.

static ChannelReader<T> ResumeAt(this ChannelReader<T> source, 
        int resumeAt, Func<Task> resume,
        CancellationToken token=default)
{
    Channel<T> channel=Channel.CreateUnbounded();
    var writer=channel.Writer;

    _ = Task.Run(async ()=>{
        bool above=false;
        await foreach(var msg in source.ReadAllAsync(token))
        {
            await writer.WriteAsync(msg);
            //Do nothing if the source isn't bounded
            if(source.CanCount)
            {
                if(above && source.Count<=resumeAt)
                {
                    await resume();
                    above=false;
                }       
                above=source.Count>resumeAt;  
            }
       }
    },token)
    .ContinueWith(t=>writer.TryComplete(t.Exception));

    return channel;
}

Since only a single thread is used, we can keep count of the previous count. and whether it was above or below the threshold.

Combining Pause and Resume

Since Pause and Resume work with just channels, they can be combined into a single method :

static ChannelReader<T> TrafficJam(this ChannelReader<T> source,
    int pauseAt,int resumeAt,
    Func<Task> pause,Func<Task> resume,
    CancellationToken token=default)
{
    return source.PauseAt(pauseAt,pause,token)
             .ResumeAt(resumeAt,resume,token);
}
Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
  • I always enjoy the contrast between Theo's approach (typically more direct and expanding directly on the .net classes) and yours (extension methods and fluent APIs, either reactive or pseudo-reactive). I do have respect for the fluent approach, but I tend to find it harder to quickly wrap my head around the mechanics. But this answer does provide food for thought and definitely still deserves a +1 ! – allmhuran Sep 22 '21 at 16:05
  • @allmhuran I've been using Dataflow for 9 years and Channels ever since they came out - 4-5 years? And SQL Server's SSIS dataflows for 20. I use all these to download air tickets sales reports from airline web services, parse them to extract data and ticket numbers, then retrieve individual ticket records and store all in a database. I've done complex and that *guarantees* you'll never know why your pipeline failed. CSP/Dataflow/Pipleines have a specific, very different logic. The libraries aren't broken, there aren't missing functionality. They're meant to be used with **their** patterns – Panagiotis Kanavos Sep 22 '21 at 16:09
  • 1
    @allmhuran Dataflow/channels are essentially *functional* architectures. That's why they look so different and why things start getting so weird when you try to use then in an object-oriented way – Panagiotis Kanavos Sep 22 '21 at 16:11