7

I have a sequence of events that happen every 10-1000 ms. I subscribe to this source of events, but want to handle them at a fixed (or minimum) interval of 500ms. I also want to process ONE event at a time, not in batches (like Buffer(x > 1)).

Something like this in pseudo code:

observable.MinimumInterval(TimeSpan.FromMiliseconds(500)).Subscribe(v=>...);

Tried e.g.:

observable.Buffer(1).Delay(TimeSpan.FromMiliseconds(500).Subscribe(v=>...);

and a lot of other potential solutions. No luck so far.

Any ideas?

Freddy
  • 329
  • 1
  • 4
  • 10
  • What would you like to happen at the end of each interval? Get the last event or a collection of all events that happened since the last interval? – Anthony Chu Feb 05 '14 at 21:19
  • Most/all of the RX operators people are mentioning drop events if they come in too fast. Is this a problem? – Foole Feb 05 '14 at 21:19
  • I want to keep all events, just process them at a fixed / minimum interval. The source does not produce infinite events, just a finite number of events in batches that I want to spread out in time. – Freddy Feb 05 '14 at 21:23
  • @Freddy - just use `Buffer` and loop over the sequence when you get it. – Sean Feb 05 '14 at 21:31
  • Using Buffer and looping today, but my aim is to spread processing out in time and fixed / minimum intervals. – Freddy Feb 05 '14 at 21:37

5 Answers5

20

I answered this very question on my blog here.

Reproducing (in case of link rot!) with the addition of presenting as an extension method:

Constraining a stream of events in Rx to a maximum rate

Sometimes, you want to limit the rate at which events arrive from an Rx stream.

The Throttle operator will suppress an event if another arrives within a specified interval. This is very useful in many instances, but it does have two important side-effects – even an unsuppressed event will be delayed by the interval, and events will get dropped altogether if they arrive too quickly.

I came across a situation where both of these were unacceptable. In this particular case, the desired behaviour was as follows: The events should be output at a maximum rate specified by a TimeSpan, but otherwise as soon as possible.

One solution works like this. Imagine our input stream is a bunch of people arriving at a railway station. For our output, we want people leave the station at a maximum rate. We set the maximum rate by having each person stand at the front of a flatbed railroad truck and sending that truck out of the station at a fixed speed. Because there is only one track, and all trucks travel at the same speed and have the same length, people will leave the station at a maximum rate when trucks are departing back-to-back. However, if the track is clear, the next person will be able to depart immediately.

So how do we translate this metaphor into Rx?

We will use the Concat operator’s ability to accept a stream of streams and merge them together back-to-back – just like sending railroad trucks down the track.

To get the equivalent of each person onto a railroad truck, we will use a Select to project each event (person) to an observable sequence (railroad truck) that starts with a single OnNext event (the person) and ends with an OnComplete exactly the defined interval later.

Lets assume the input events are an IObservable in the variable input. Here’s the code:

var paced = input.Select(i => Observable.Empty<T>()
                                        .Delay(interval)
                                        .StartWith(i)).Concat();

As an extension method this becomes:

public static IObservable<T> Pace<T>(this IObservable<T> source, TimeSpan interval)
{
    return source.Select(i => Observable.Empty<T>()
                                        .Delay(interval)
                                        .StartWith(i)).Concat();

}
Community
  • 1
  • 1
James World
  • 29,019
  • 9
  • 86
  • 120
  • This is great, but it always delays events even if the interval has passed. – Foole Feb 05 '14 at 22:51
  • No, it doesn't, since the `StartWith` ensures the event is emitted immediately. Look closer. All that is delayed is the final OnCompleted if the source stream terminates. Although I did miss the `` type parameter off Pace... added it in now. – James World Feb 05 '14 at 22:52
  • My mistake. This is perfect. – Foole Feb 05 '14 at 23:00
  • Thanks for keeping me on my toes! :) – James World Feb 05 '14 at 23:03
  • It works, but still trying to get my head around this. Why doesn't the StartWith cause every single element to be emitted straight away? :-S – Michal Ciechan Oct 28 '16 at 16:56
  • +1 took me a little while to figure this out but works great!. For each element you create a new observable with a single element(StartWith), which then doesn't finish until delay has passed. And the next item creates the same observable (element @ start -> delay -> finish) but only starts after first observable is finished (after delay). Awesome + 1 – Michal Ciechan Oct 28 '16 at 17:07
2

Since you want to keep all events, I think you're on the right track with Buffer. But you should call it with a TimeSpan...

observable.Buffer(TimeSpan.FromMiliseconds(500)).Subscribe(v=>...);

... where v is an IList<TSource> that you can loop over.

Your original call of Buffer(1) will fire whenever it gets 1 event, which is the same as if it wasn't there at all. Buffering with a time window will collect all the events that fire within the interval and give it to you at the end of each interval.

Anthony Chu
  • 37,170
  • 10
  • 81
  • 71
  • The time span given to Buffer is max time. I should probably also have mentioned that I want to process one event at a time. – Freddy Feb 05 '14 at 21:28
  • @Freddy - can't you just loop through the items in the buffer when you are notified? – Sean Feb 05 '14 at 21:30
  • 1
    The subscription will give you an `IList` at the end of each interval, you can loop through each one and process it individually. – Anthony Chu Feb 05 '14 at 21:31
  • I could (and I do today), but this is UI (WPF) and in my case I want to process each event with a fitting interval not to freeze up UI. The processing of the event can take a bit of time (xxx miliseconds), therefore I want to spread it out. – Freddy Feb 05 '14 at 21:33
1

This is my attempt:

    public static IObservable<T> MinimumInterval<T>(this IObservable<T> source, TimeSpan rate, IScheduler scheduler = null)
    {
        if (scheduler == null)
            scheduler = TaskPoolScheduler.Default;

        Func<IObserver<T>, IDisposable> subscribe = obs => {
            var nextTick = scheduler.Now;

            var subscriptions = new CompositeDisposable();

            Action<T> onNext = value => 
            {
                var sendTime = Max(nextTick, scheduler.Now);

                var disp = new SingleAssignmentDisposable();
                disp.Disposable = scheduler.Schedule(sendTime, () => 
                { 
                    subscriptions.Remove(disp); 
                    obs.OnNext(value);
                });
                subscriptions.Add(disp);

                nextTick = sendTime + rate;
            };
            Action<Exception> onError = err => { subscriptions.Dispose(); obs.OnError(err); };
            Action onCompleted = () => { subscriptions.Dispose(); obs.OnCompleted(); };
            var listener = Observer.Create(onNext, onError, onCompleted);

            subscriptions.Add(source.Subscribe(listener));

            return subscriptions;
        };

        return Observable.Create<T>(subscribe);
    }

It keeps track of the earliest the next message can be sent and uses the scheduler to delay events if they occur too soon. The CompositeDisposable ensures scheduled events are cancelled when the listener unsubscribes.

Constructive feedback is welcome.

Foole
  • 4,754
  • 1
  • 26
  • 23
  • Tried it and it works very well :-) Still marking James reply as answer to my question. It was informative and concise solution. – Freddy Feb 05 '14 at 22:23
0

There is a Throttle extension method that should what you want to achieve.

Georg
  • 5,626
  • 1
  • 23
  • 44
  • As I understand it, Throttle only takes the last event. I need to keep all events. See comment on original post. – Freddy Feb 05 '14 at 21:24
0

try this

var interval = Observable.Timer(TimeSpan.FromMilliseconds(500)).IgnoreElements();
var observable2 = observable
    .Select(e => Observable.Return(e).Concat(interval))
    .Concat();

observable2.Subscribe(e =>
    {
        // will have a minimum interval of 500ms between calls
    });
Brandon
  • 38,310
  • 8
  • 82
  • 87