38

I want to effectively throttle an event stream, so that my delegate is called when the first event is received but then not for 1 second if subsequent events are received. After expiry of that timeout (1 second), if a subsequent event was received I want my delegate to be called.

Is there a simple way to do this using Reactive Extensions?

Sample code:

static void Main(string[] args)
{
    Console.WriteLine("Running...");

    var generator = Observable
        .GenerateWithTime(1, x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1)
        .Timestamp();

    var builder = new StringBuilder();

    generator
        .Sample(TimeSpan.FromSeconds(1))
        .Finally(() => Console.WriteLine(builder.ToString()))
        .Subscribe(feed =>
                   builder.AppendLine(string.Format("Observed {0:000}, generated at {1}, observed at {2}",
                                                    feed.Value,
                                                    feed.Timestamp.ToString("mm:ss.fff"),
                                                    DateTime.Now.ToString("mm:ss.fff"))));

    Console.ReadKey();
}

Current output:

Running...
Observed 064, generated at 41:43.602, observed at 41:43.602
Observed 100, generated at 41:44.165, observed at 41:44.602

But I want to observe (timestamps obviously will change)

Running...
Observed 001, generated at 41:43.602, observed at 41:43.602
....
Observed 100, generated at 41:44.165, observed at 41:44.602
Daniel Fortunov
  • 43,309
  • 26
  • 81
  • 106
Alex
  • 551
  • 1
  • 5
  • 10

8 Answers8

18

Okay,

you have 3 scenarios here:

1) I would like to get one value of the event stream every second. means: that if it produces more events per second, you will get a always bigger buffer.

observableStream.Throttle(timeSpan)

2) I would like to get the latest event, that was produced before the second happens means: other events get dropped.

observableStream.Sample(TimeSpan.FromSeconds(1))

3) you would like to get all events, that happened in the last second. and that every second

observableStream.BufferWithTime(timeSpan)

4) you want to select what happens in between the second with all the values, till the second has passed, and your result is returned

observableStream.CombineLatest(Observable.Interval(1000), selectorOnEachEvent)
JJS
  • 6,431
  • 1
  • 54
  • 70
cRichter
  • 1,411
  • 7
  • 7
15

Here's is what I got with some help from the RX Forum:

The idea is to issue a series of "tickets" for the original sequence to fire. These "tickets" are delayed for the timeout, excluding the very first one, which is immediately pre-pended to the ticket sequence. When an event comes in and there is a ticket waiting, the event fires immediately, otherwise it waits till the ticket and then fires. When it fires, the next ticket is issued, and so on...

To combine the tickets and original events, we need a combinator. Unfortunately, the "standard" .CombineLatest cannot be used here because it would fire on tickets and events that were used previousely. So I had to create my own combinator, which is basically a filtered .CombineLatest, that fires only when both elements in the combination are "fresh" - were never returned before. I call it .CombineVeryLatest aka .BrokenZip ;)

Using .CombineVeryLatest, the above idea can be implemented as such:

    public static IObservable<T> SampleResponsive<T>(
        this IObservable<T> source, TimeSpan delay)
    {
        return source.Publish(src =>
        {
            var fire = new Subject<T>();

            var whenCanFire = fire
                .Select(u => new Unit())
                .Delay(delay)
                .StartWith(new Unit());

            var subscription = src
                .CombineVeryLatest(whenCanFire, (x, flag) => x)
                .Subscribe(fire);

            return fire.Finally(subscription.Dispose);
        });
    }

    public static IObservable<TResult> CombineVeryLatest
        <TLeft, TRight, TResult>(this IObservable<TLeft> leftSource,
        IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector)
    {
        var ls = leftSource.Select(x => new Used<TLeft>(x));
        var rs = rightSource.Select(x => new Used<TRight>(x));
        var cmb = ls.CombineLatest(rs, (x, y) => new { x, y });
        var fltCmb = cmb
            .Where(a => !(a.x.IsUsed || a.y.IsUsed))
            .Do(a => { a.x.IsUsed = true; a.y.IsUsed = true; });
        return fltCmb.Select(a => selector(a.x.Value, a.y.Value));
    }

    private class Used<T>
    {
        internal T Value { get; private set; }
        internal bool IsUsed { get; set; }

        internal Used(T value)
        {
            Value = value;
        }
    }

Edit: here's another more compact variation of CombineVeryLatest proposed by Andreas Köpf on the forum:

public static IObservable<TResult> CombineVeryLatest
  <TLeft, TRight, TResult>(this IObservable<TLeft> leftSource,
  IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector)
{
    return Observable.Defer(() =>
    {
        int l = -1, r = -1;
        return Observable.CombineLatest(
            leftSource.Select(Tuple.Create<TLeft, int>),
            rightSource.Select(Tuple.Create<TRight, int>),
                (x, y) => new { x, y })
            .Where(t => t.x.Item2 != l && t.y.Item2 != r)
            .Do(t => { l = t.x.Item2; r = t.y.Item2; })
            .Select(t => selector(t.x.Item1, t.y.Item1));
    });
}
Sergey Aldoukhov
  • 22,316
  • 18
  • 72
  • 99
9

I was struggling with this same problem last night, and believe I've found a more elegant (or at least shorter) solution:

var delay = Observable.Empty<T>().Delay(TimeSpan.FromSeconds(1));
var throttledSource = source.Take(1).Concat(delay).Repeat();
LiquidAsh
  • 271
  • 3
  • 4
6

This is the what I posted as an answer to this question in the Rx forum:

UPDATE: Here is a new version that does no longer delay event forwarding when events occur with a time difference of more than one second:

public static IObservable<T> ThrottleResponsive3<T>(this IObservable<T> source, TimeSpan minInterval)
{
    return Observable.CreateWithDisposable<T>(o =>
    {
        object gate = new Object();
        Notification<T> last = null, lastNonTerminal = null;
        DateTime referenceTime = DateTime.UtcNow - minInterval;
        var delayedReplay = new MutableDisposable();
        return new CompositeDisposable(source.Materialize().Subscribe(x =>
        {
            lock (gate)
            {
                var elapsed = DateTime.UtcNow - referenceTime;
                if (elapsed >= minInterval && delayedReplay.Disposable == null)
                {
                    referenceTime = DateTime.UtcNow;
                    x.Accept(o);
                }
                else
                {
                    if (x.Kind == NotificationKind.OnNext)
                        lastNonTerminal = x;
                    last = x;
                    if (delayedReplay.Disposable == null)
                    {
                        delayedReplay.Disposable = Scheduler.ThreadPool.Schedule(() =>
                        {
                            lock (gate)
                            {
                                referenceTime = DateTime.UtcNow;
                                if (lastNonTerminal != null && lastNonTerminal != last)
                                    lastNonTerminal.Accept(o);
                                last.Accept(o);
                                last = lastNonTerminal = null;
                                delayedReplay.Disposable = null;
                            }
                        }, minInterval - elapsed);
                    }
                }
            }
        }), delayedReplay);
    });
}

This was my earlier try:

var source = Observable.GenerateWithTime(1, 
    x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1)
    .Timestamp();

source.Publish(o =>
    o.Take(1).Merge(o.Skip(1).Sample(TimeSpan.FromSeconds(1)))
).Run(x => Console.WriteLine(x));
blueling
  • 2,003
  • 1
  • 17
  • 16
  • 6
    Just found a link to this answer in the Microsoft Todo UWP app credits :) – CalvT Sep 19 '17 at 12:57
  • @CalvT Yep! I was expecting to see a list of open source projects but here is this stackflow answer! Might have something to do with their transition from AWS to Azure: https://www.theverge.com/2018/3/21/17146308/microsoft-wunderlist-to-do-app-acquisition-complicated. – Cosmos Gu Jul 03 '18 at 00:46
2

Ok, here's one solution. I don't like it, particularly, but... oh well.

Hat tips to Jon for pointing me at SkipWhile, and to cRichter for the BufferWithTime. Thanks guys.

static void Main(string[] args)
{
    Console.WriteLine("Running...");

    var generator = Observable
        .GenerateWithTime(1, x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1)
        .Timestamp();

    var bufferedAtOneSec = generator.BufferWithTime(TimeSpan.FromSeconds(1));

    var action = new Action<Timestamped<int>>(
        feed => Console.WriteLine("Observed {0:000}, generated at {1}, observed at {2}",
                                  feed.Value,
                                  feed.Timestamp.ToString("mm:ss.fff"),
                                  DateTime.Now.ToString("mm:ss.fff")));

    var reactImmediately = true;
    bufferedAtOneSec.Subscribe(list =>
                                   {
                                       if (list.Count == 0)
                                       {
                                           reactImmediately = true;
                                       }
                                       else
                                       {
                                           action(list.Last());
                                       }
                                   });
    generator
        .SkipWhile(item => reactImmediately == false)
        .Subscribe(feed =>
                       {
                           if(reactImmediately)
                           {
                               reactImmediately = false;
                               action(feed);
                           }
                       });

    Console.ReadKey();
}
Alex
  • 551
  • 1
  • 5
  • 10
0

Have you tried the Throttle extension method?

From the docs:

Ignores values from an observable sequence which are followed by another value before dueTime

It's not quite clear to me whether that's going to do what you want or not - in that you want to ignore the following values rather than the first value... but I would expect it to be what you want. Give it a try :)

EDIT: Hmmm... no, I don't think Throttle is the right thing, after all. I believe I see what you want to do, but I can't see anything in the framework to do it. I may well have missed something though. Have you asked on the Rx forum? It may well be that if it's not there now, they'd be happy to add it :)

I suspect you could do it cunningly with SkipUntil and SelectMany somehow... but I think it should be in its own method.

Jon Skeet
  • 1,421,763
  • 867
  • 9,128
  • 9,194
  • Thanks Jon. I gave it a shot, but its not quite what I want. In the example, using Throttle causes all but the last event to be ignored. I need to react upon the first event (to provide a responsive system), but to then delay at a 1 second sampling rate for subsequent events. – Alex Jul 09 '10 at 09:34
  • Jon, you suggested to ask in the Rx forum. Actually there is an [open issue #395 - Implement a real throttle...](https://github.com/dotnet/reactive/issues/395) requesting this exact behaviour. – Magnus Sep 10 '19 at 11:33
  • 1
    @Magnus: Well there is now - filed 7 years after this answer was posted! – Jon Skeet Sep 10 '19 at 12:21
  • - oops, guess I didn't notice the '10 after the date. Well the link for the issue is here for others to find now anyway. – Magnus Sep 10 '19 at 17:02
0

What you are searching for is the CombineLatest.

public static IObservable<TResult> CombineLatest<TLeft, TRight, TResult>(
    IObservable<TLeft> leftSource,
    IObservable<TRight> rightSource,
    Func<TLeft, TRight, TResult> selector
)

that merges 2 obeservables, and returning all values, when the selector (time) has a value.

edit: john is right, that is maybe not the preferred solution

cRichter
  • 1,411
  • 7
  • 7
  • I don't see how that's what he's after - what do you see as the two observables here? – Jon Skeet Jul 09 '10 at 09:06
  • One is the Events he generate, the selector is a Observable.Interval(TimeSpan.FromSeconds(1)) – cRichter Jul 09 '10 at 09:31
  • Won't that generate an event every time *either* of them produces a value? – Jon Skeet Jul 09 '10 at 09:34
  • Doing this... var interval = Observable.Interval(TimeSpan.FromSeconds(1)).Start(); generator.CombineLatest(interval, (value, gate) => value ) ... resulted in no observations. I don't know how I would use CombineLatest to achieve this. – Alex Jul 09 '10 at 09:58
  • True, but you can select what should happen then. e.g. when its from stream1, then cache it and return null, if its from stream 2, then return the cache. the only thing what you have to do afterwards is limit the results, that no null values should be returned. (basically, thats how the BufferWithTime is implemented ;-) – cRichter Jul 09 '10 at 09:59
0

Inspired by Bluelings answer I provide here a version that compiles with Reactive Extensions 2.2.5.

This particular version counts the number of samples and also provide the last sampled value. To do this the following class is used:

class Sample<T> {

  public Sample(T lastValue, Int32 count) {
    LastValue = lastValue;
    Count = count;
  }

  public T LastValue { get; private set; }

  public Int32 Count { get; private set; }

}

Here is the operator:

public static IObservable<Sample<T>> SampleResponsive<T>(this IObservable<T> source, TimeSpan interval, IScheduler scheduler = null) {
  if (source == null)
    throw new ArgumentNullException(nameof(source));
  return Observable.Create<Sample<T>>(
    observer => {
      var gate = new Object();
      var lastSampleValue = default(T);
      var lastSampleTime = default(DateTime);
      var sampleCount = 0;
      var scheduledTask = new SerialDisposable();
      return new CompositeDisposable(
        source.Subscribe(
          value => {
            lock (gate) {
              var now = DateTime.UtcNow;
              var elapsed = now - lastSampleTime;
              if (elapsed >= interval) {
                observer.OnNext(new Sample<T>(value, 1));
                lastSampleValue = value;
                lastSampleTime = now;
                sampleCount = 0;
              }
              else {
                if (scheduledTask.Disposable == null) {
                  scheduledTask.Disposable = (scheduler ?? Scheduler.Default).Schedule(
                    interval - elapsed,
                    () => {
                      lock (gate) {
                        if (sampleCount > 0) {
                          lastSampleTime = DateTime.UtcNow;
                          observer.OnNext(new Sample<T>(lastSampleValue, sampleCount));
                          sampleCount = 0;
                        }
                        scheduledTask.Disposable = null;
                      }
                    }
                  );
                }
                lastSampleValue = value;
                sampleCount += 1;
              }
            }
          },
          error => {
            if (sampleCount > 0)
              observer.OnNext(new Sample<T>(lastSampleValue, sampleCount));
            observer.OnError(error);
          },
          () => {
            if (sampleCount > 0)
              observer.OnNext(new Sample<T>(lastSampleValue, sampleCount));
            observer.OnCompleted();
          }
        ),
        scheduledTask
      );
    }
  );
}
Martin Liversage
  • 104,481
  • 22
  • 209
  • 256