4

Very similar to this question: Rx IObservable buffering to smooth out bursts of events, I am interested in smoothing out observables that may occur in bursts.

Hopefully the diagram below illustrates that I am aiming for:

Raw:       A--B--CDE-F--------------G-----------------------
Interval:  o--o--o--o--o--o--o--o--o--o--o--o--o--o--o--o--o
Output:    A--B--C--D--E--F-----------G---------------------

Given the raw stream, I wish to stretch these events over regular intervals.

Throttling does not work as then I end up losing elements of the raw sequence.

Zip works well if the raw stream is more frequent than the timer, but fails if there are periods where there are no raw events.

EDIT

In response to Dan's answer, the problem with Buffer is that if bursts of many events arrive within a short time interval then I receive the events too often. Below shows what could happen with a buffer size of 3, and a timeout configured to the required interval:

Raw:       -ABC-DEF-----------G-H-------------------------------
Interval:  o--------o--------o--------o--------o--------o--------
Buffered:  ---A---D-------------------G--------------------------
              B   E                   H
              C   F
Desired:   ---------A--------B--------C--------D--------E ..etc.
Community
  • 1
  • 1
Lawrence
  • 3,287
  • 19
  • 32
  • 1
    possible duplicate of [Process Rx events at fixed or minimum intervals](http://stackoverflow.com/questions/21588625/process-rx-events-at-fixed-or-minimum-intervals) – James World Aug 09 '14 at 14:31
  • I believe a way of phrasing [this](http://stackoverflow.com/questions/21588625/process-rx-events-at-fixed-or-minimum-intervals/21589238#21589238) one differently? It's not quite going to match your diagram, since G will emit immediately; but I *suspect* it's not going to be an issue? – James World Aug 09 '14 at 14:31
  • Thank you. You are right, it wasn't an issue that G emits straight away. – Lawrence Aug 12 '14 at 14:30

2 Answers2

1

How about this? (inspired by James' answer mentioned in the comments)...

public static IObservable<T> Regulate<T>(this IObservable<T> source, TimeSpan period)
{
    var interval = Observable.Interval(period).Publish().RefCount();

    return source.Select(x => Observable.Return(x)
                                        .CombineLatest(interval, (v, _) => v)
                                        .Take(1))
                 .Concat();
}

It turns each value in the raw observable into its own observable. The CombineLatest means it won't produce a value until the interval does. Then we just take one value from each of these observables and concatenate.

The first value in the raw observable gets delayed by one period. I'm not sure if that is an issue for you or not.

Community
  • 1
  • 1
IanR
  • 4,703
  • 3
  • 29
  • 27
0

It looks like what you want to use is Buffer. One of the overloads allows you to specify an interval as well as the buffer length. You could conceivably set the length to 1.

Raw.Buffer(interval, 1);

For some more examples of its use, you can refer to the IntroToRX site.

Dan Lyons
  • 219
  • 4
  • 12
  • Thanks for your answer - unfortunately Buffer doesn't do quite what I want either - see my edited question. – Lawrence Aug 08 '14 at 20:46