1

I have code similar to this.

IPruduceDemUpdates.Subscribe(update => DoUpdate(update));

But what I want to do is something like that.

IPruduceDemUpdates.Subscribe(update => if(NoDoUpadteIsRunning) DoUpdate(update));

So it ignores incoming updates, if the update method is already running. In addition, it should always execute the last update. No matter if it is the last update of the stream or the last for a period of time. Here an example timeline

  • Update 1 starts
  • Update 2 is ignored
  • Update 3 is ignored
  • Update 4 is ignored
  • Update 1 finished
  • Update 4 starts
  • Update 4 finished

Edit

I have solution for skipping

        IPruduceDemUpdates.Subscribe(update =>
        {
            if (_task == null || _task.IsCompleted || _task.IsCanceled || _task.IsFaulted)
                _task = DoUpdate(update);
        });

But I don't know how to be sure, that the last update will process.

Nabil A.
  • 3,270
  • 17
  • 29
  • 2
    The Reactive Framework (Rx) already has inbuilt protection against parallel calls to the `OnNext` delegate. Each `DoUpdate` will happen in series and never overlap. However, it does queue up subsequent values so it won't ignore any updates. Does that satisfy your needs? – Enigmativity Oct 29 '14 at 06:38
  • 1
    `IPruduceDemUpdates`, LOL. Nice :) – Yuval Itzchakov Oct 29 '14 at 06:52
  • @Enigmativity Thx 4 the info. But is does not solve my problem. The "updater" can't stand the preasure of updates. So I want to drop some, cos just the last state is of interest. But I don't want to cancel a update that has already running. – Nabil A. Oct 29 '14 at 11:30
  • See http://stackoverflow.com/questions/11010602/with-rx-how-do-i-ignore-all-except-the-latest-value-when-my-subscribe-method-is?rq=1 and https://social.msdn.microsoft.com/Forums/en-US/bbcc1af9-64b4-456b-9038-a540cb5f5de5/how-do-i-ignore-allexceptthelatest-value-when-my-subscribe-method-is-running?forum=rx too. – James World Nov 23 '14 at 12:02

2 Answers2

3

If DoUpdate is synchronous (which it appears to be in this case), you can use BufferIntrospective from Rxx. It does exactly what you want:

IProduceDemUpdates
    .BufferIntrospective()
    .Where(items => items.Count > 0) // ignore empty buffers
    .Select(items => items[items.Count - 1]) // ignore all but last item in buffer
    .Subscribe(DoUpdate);
Brandon
  • 38,310
  • 8
  • 82
  • 87
2

I'm a bit late to the party, but here's built-in one-liner way to do this. Sample with a TimeSpan.Zero will continuously sample the source and always push the last update - to make this work you need to give Sample it's own thread so that the sampled source can run uninterrupted:

IProduceDemUpdates.Sample(TimeSpan.Zero, NewThreadScheduler.Default)
                  .Subscribe(DoUpdate);

Notes

The thread is reused per subscriber to Sample not generated per event - so no need to worry about a mountain of threads being created. You could also do .Sample(TimeSpan.Zero, new EventLoopScheduler()). The point is that the subscriber is being delivered events on the same thread that Sample is sampling on, so that Sample won't sample the next event until the subscriber is done.

Examples

The following code:

// emits 0,1,2,3,4,5,6,7,8,9
var source = Observable.Interval(TimeSpan.FromSeconds(1)).Take(10); 

source.Sample(TimeSpan.Zero, NewThreadScheduler.Default).Subscribe(x => {
        Console.WriteLine(x);
        Task.Delay(TimeSpan.FromSeconds(3)).Wait();
});

Will typically output:

0
2
5
9

To convince yourself the last update is always received try:

// emit 0,1,2
var source = Observable.Interval(TimeSpan.FromSeconds(1)).Take(3);

source.Sample(TimeSpan.Zero, NewThreadScheduler.Default).Subscribe(x => {
        Console.WriteLine(x);
        Task.Delay(TimeSpan.FromSeconds(10)).Wait();
});

Which will output:

0
2

Caveat

Whatever your approach, note how important it is that you accomplish the slowest operation in the synchronous chain of subscribers attached to Sample. If you have asynchronous subscriptions further downstream (such as an ObserveOn) that include slower operators then you will just be transferring the back-pressure elsewhere.

James World
  • 29,019
  • 9
  • 86
  • 120
  • Clever. I'll have to rethink whether I should continue offering the `SampleIntrospective` variant, or perhaps change it to some kind of throttling strategy instead. It never felt right anyway to simply rely on `BufferIntrospective` to do single-item sampling - it's obviously a waste of memory. – Dave Sexton Dec 02 '14 at 17:03