2

Searched hard for a piece of code which does what i want and i am happy with. Reading this and this helped a lot.

I have a scenario where i need a single consumer to be notified by a single producer when new data is available but would also like the consumer to be notified periodically regardless of if new data is available. It is fine if the consumer is notified more than the reoccurring period but it should not be notified less frequent.

It is possible that multiple notifications for 'new data' occur while the consumer is already notified and working. (So SemaphoreSlim was not a good fit). Hence, a consumer which is slower than the rate of producer notifications, would not queue up subsequent notifications, they would just "re-signal" that same "data available" flag without affect.

I would also like the consumer to asynchronously wait for the notifications (without blocking a thread).

I have stitched together the below class which wraps around TaskCompletionSource and also uses an internal Timer.

public class PeriodicalNotifier : IDisposable
{
    // Need some dummy type since TaskCompletionSource has only the generic version
    internal struct VoidTypeStruct { }
    // Always reuse this allocation
    private static VoidTypeStruct dummyStruct;

    private TaskCompletionSource<VoidTypeStruct> internalCompletionSource;
    private Timer reSendTimer;

    public PeriodicalNotifier(int autoNotifyIntervalMs)
    {
        internalCompletionSource = new TaskCompletionSource<VoidTypeStruct>();
        reSendTimer = new Timer(_ => Notify(), null, 0, autoNotifyIntervalMs);
    }

    public async Task WaitForNotifictionAsync(CancellationToken cancellationToken)
    {
        using (cancellationToken.Register(() => internalCompletionSource.TrySetCanceled()))
        {
            await internalCompletionSource.Task;
            // Recreate - to be able to set again upon the next wait
            internalCompletionSource = new TaskCompletionSource<VoidTypeStruct>();
        }
    }

    public void Notify()
    {
        internalCompletionSource.TrySetResult(dummyStruct);
    }

    public void Dispose()
    {
        reSendTimer.Dispose();
        internalCompletionSource.TrySetCanceled();
    }
}

Users of this class can do something like this:

private PeriodicalNotifier notifier = new PeriodicalNotifier(100);

// ... In some task - which should be non-blocking
while (some condition)
{
    await notifier.WaitForNotifictionAsync(_tokenSource.Token);
    // Do some work...
}

// ... In some thread, producer added new data
notifier.Notify();

Efficiency is important to me, the scenario is of a high frequency data stream, and so i had in mind:

  • The non-blocking nature of the wait.
  • I assume Timer is more efficient than recreating Task.Delay and cancelling it if it's not the one to notify.
  • A concern for the recreation of the TaskCompletionSource

My questions are:

  1. Does my code correctly solve the problem? Any hidden pitfalls?
  2. Am i missing some trivial solution / existing block for this use case?

Update:

I have reached a conclusion that aside from re implementing a more lean Task Completion structure (like in here and here) i have no more optimizations to make. Hope that helps anyone looking at a similar scenario.

Community
  • 1
  • 1
Yoad Snapir
  • 528
  • 2
  • 10
  • 3
    Seems to me that Rx should be able to handle this quite nicely. Any reason you're not using Rx? – Stephen Cleary Aug 11 '14 at 20:30
  • I vaguely agree, Have been fiddling with Rx in some other project but this one is a legacy one, introducing Rx for this little piece seems like an over kill. Btw, did you think about something like merging two streams, one Observable.Interval and one of the "available data" events ? – Yoad Snapir Aug 11 '14 at 20:44
  • 4
    Yes, that's what I was thinking. I find as a general rule that anytime there's *time* involved (e.g., your periodic notification), that's where Rx really shines. – Stephen Cleary Aug 11 '14 at 20:49
  • Thanks, guess i will reconsider my approach than. – Yoad Snapir Aug 11 '14 at 20:50
  • Have you thought about TPL Dataflow as an option? – Yuval Itzchakov Aug 11 '14 at 20:57
  • Yep, although i could ease it a little with a Buffering block (for the async notifications) i would still have to create a timer to produce periodical notifications into the block. On top of that, multiple notifications on the same consumer iteration would queue up which is undesirable. Did you have a different approach in mind? – Yoad Snapir Aug 12 '14 at 06:03

2 Answers2

0
  1. Yes, your implementation makes sense but the TaskCompletionSource recreation should be outside the using scope, otherwise the "old" cancellation token may cancel the "new" TaskCompletionSource.
  2. I think using some kind of AsyncManualResetEvent combined with a Timer would be simpler and less error-prone. There's a very nice namespace with async tools in the Visual Studio SDK by Microsoft. You need to install the SDK and then reference the Microsoft.VisualStudio.Threading assembly. Here's an implementation using their AsyncManualResetEvent with the same API:

public class PeriodicalNotifier : IDisposable
{
    private readonly Timer _timer;
    private readonly AsyncManualResetEvent _asyncManualResetEvent;

    public PeriodicalNotifier(TimeSpan autoNotifyInterval)
    {
        _asyncManualResetEvent = new AsyncManualResetEvent();
        _timer = new Timer(_ => Notify(), null, TimeSpan.Zero, autoNotifyInterval);
    }

    public async Task WaitForNotifictionAsync(CancellationToken cancellationToken)
    {
        await _asyncManualResetEvent.WaitAsync().WithCancellation(cancellationToken);
        _asyncManualResetEvent.Reset();
    }

    public void Notify()
    {
        _asyncManualResetEvent.Set();
    }

    public void Dispose()
    {
        _timer.Dispose();
    }
}

You notify by setting the reset event, asynchronously wait using WaitAsync, enable Cancellation using the WithCancellation extension method and then reset the event. Multiple notifications are "merged" by setting the same reset event.

i3arnon
  • 113,022
  • 33
  • 324
  • 344
  • Thanks, I am building it and comparing the performance and efficiency now. – Yoad Snapir Aug 14 '14 at 06:36
  • Btw, Regarding the location of the recreation - that is by design, otherwise a cancellation might be missed. Since this mechanism is expected to be used in a loop, the recreation is just a side of affect of how TaskCompltionSource behaves. The cancellation should apply to this block no matter which TaskCompletionSource is "In Charge". Right? – Yoad Snapir Aug 14 '14 at 06:43
  • I am sorry, i could not get your suggestion to pass my validation tests. It seems like when arriving at the `await _buffer.OutputAvailableAsync(cancellationToken)` too soon, it would not longer release the awaiter when notified. – Yoad Snapir Aug 14 '14 at 08:13
  • @YoadSnapir What do you mean by "too soon"? – i3arnon Aug 14 '14 at 09:18
  • @YoadSnapir At the moment the cancellation token parameter will cancel the current `WaitForNotifictionAsync` call, but it may under certain conditions cancel the next call instead. If you always use the same token than it doesn't matter, but in that case I would pass it to the constructor instead of every single call. – i3arnon Aug 14 '14 at 09:23
  • By too soon i mean that when the notification is raised, the consumer would go and do the work. If it arrives at the `await` again after a short period (on my machine it's less than 50 ms) it would just continue to wait regardless of any posting of new items to the block. I can post a small code snippet of the repro if you like. – Yoad Snapir Aug 14 '14 at 12:52
  • @YoadSnapir I'm not sure why that happens (I'm looking into it) but this: `object item; while (_buffer.TryReceive(out item));` fixes it. The `AsyncAutoResetEvent` option though works great and I recommend you use it. – i3arnon Aug 14 '14 at 13:21
  • The `TaskCompletionSource` is used in `AsyncManualResetEvent` like i used it. The added internal queue in `AsyncAutoResetEvent` is an over complication (I actually need a manual reset event here due to how it's built). This answers my question though since i figure i have reached the optimum. Thanks! – Yoad Snapir Aug 14 '14 at 14:11
  • @YoadSnapir `AsyncManualResetEvent` is indeed simpler (but requires resetting). I changed the answer accordingly. At the end all truly async implementations use `TaskCompletionSource` at the root level – i3arnon Aug 14 '14 at 14:56
  • @YoadSnapir btw... We possibly found a bug in `TPL Dataflow`: http://stackoverflow.com/q/25339029/885318 – i3arnon Aug 16 '14 at 13:32
  • I believe that in my scenario, implementing a leaner "awaitable" would beat `TaskCompletionSource` though over optimization is all over such an approach in my case. Might try it for fun though. – Yoad Snapir Aug 17 '14 at 08:22
0
Subject<Result> notifier = new Subject<Result)();

notifier 
    .Select(value => Observable.Interval(TimeSpan.FromMilliSeconds(100))
                                            .Select(_ => value)).Switch()
    .Subscribe(value => DoSomething(value));

//Some other thread...
notifier.OnNext(...);

This Rx query will keep sending value, every 100 milliseconds, until a new value turns up. Then we notify that value every 100 milliseconds.

If we receive values faster than once every 100 milliseconds, then we basically have the same output as input.

Aron
  • 15,464
  • 3
  • 31
  • 64
  • If i understand correctly, for each `OnNext` you will get a single `DoSomething`. That is not according to my spec which states that while the consumer is "working" notifications just make the next "wait" an instant `DoSomething`, so a slow consumer would not queue up notification. Am I right? edited my question to clarify that aspect of the spec. – Yoad Snapir Aug 14 '14 at 13:02
  • Only One DoSomeThing should be in flight at any one time. – Aron Aug 14 '14 at 13:06
  • I agree, but if one is in flight and two notifications arrive at once - only one other `DoSomething` should fire off after the current one is done. (not two). – Yoad Snapir Aug 14 '14 at 14:18
  • @YoadSnapir what concurrency model do you even want with DoSomething? The default behavior is that they will be processed one at a time (which I only learnt today, from PluralSight). Or do you want no queuing behavior? – Aron Aug 14 '14 at 16:14
  • Exactly, no queuing in this case. Think of a stream of prices, usually you want to know the last price of something since the n-1 price is no longer interesting if you just want to buy. – Yoad Snapir Aug 17 '14 at 08:45
  • I happen to work in finance. The last n-1 data points are interesting. – Aron Aug 17 '14 at 08:50