4

I made a extension method:

public static IObservable<T> RateLimit<T>(this IObservable<T> source, 
                                          TimeSpan minDelay)
{
    return
        source.TimeInterval()
            .Select(
                (x, i) =>
                    Observable.Return(x.Value)
                        .Delay(i == 0
                            ? TimeSpan.Zero
                            : TimeSpan.FromTicks(
                                  Math.Max(minDelay.Ticks - x.Interval.Ticks, 0))))
            .Concat();
}

this creates a new observable that only lets items through with a minimum separation in time.

To remove initial latency, it is necessary to treat the first item differently.

As can be seen, there is a test to see if we are dealing with the first item by testing i == 0. The problem here is that if we process more than int.MaxValue items, this will fail.

Instead, I thought about the following sequence

var trueThenFalse = Observable.Return(true)
                    .Concat(Observable.Repeat(false))

and zipping it up alongside my source:

source.TimeInterval().Zip(trueThenFalse, ...

but when passing this infinite sequence to Zip, we appear to enter a tight loop where trueThenFalse emits all items in one go (to infinity). Fail.

I could easily code around this with side-effects (a bool in the outer scope, for instance), but this would represent a loss of purity that I would not be happy with.

Any suggestions?

EDIT

Although not quite the same behaviour, the following code exhibits some nasty traits

var trueThenFalse = Observable.Return(true)
    .Concat(Observable.Repeat(false));
var src = Observable.Interval(TimeSpan.FromSeconds(1)); //never completes
src.Zip(trueThenFalse,(i,tf)=>tf).ForEach(x=>Trace.TraceInformation("{0}",x));

and eventually dies with an OOME. This is because trueThenFalse appears to be unspooling all of its values, but they aren't consumed by the Zip in a timely fashion.

spender
  • 117,338
  • 33
  • 229
  • 351
  • Seems to be working for me with trueThenFalse and Zip. – Evk Nov 22 '16 at 11:33
  • So perhaps this has something to do with the way the continuations are scheduled? Given that `IObservable` only really provides the `Subscribe` method, it's difficult for me to understand how an infinite (cold) sequence wouldn't attempt to unspool all of its items immediately when subscribed to in this "push" model. – spender Nov 22 '16 at 11:38
  • But zip grabs next element from two sequences. First it grabs "true" from first sequence and first value from second (no delay). Then it grabs "false" and second value from second sequence, but here there is a delay now. Not sure why it should spool all items immediately. Maybe you can post code with trueThenFalse and Zip which you said fails? – Evk Nov 22 '16 at 11:43
  • but how does `trueThenFalse` know when an item has been consumed when the IObserver interface only offers `OnNext(item)` for it to call (push) and no concept of "give me the next item"? (working on a reproducible test case) – spender Nov 22 '16 at 11:46
  • I can confirm the two comments made by @Evk. `.Zip(trueThenFalse, ...)` works just fine for me too and according to my understanding that grabbing of arrived items effectively is the way cold sequences are being processed by `.Zip()`. `trueThenFalse` doesn't need to know anything - it's `OnNext()` is called by `.Zip()` whenever an item is provided from the other sequence. I'm curious to see your reproducible test case. – haindl Nov 22 '16 at 11:55
  • Well I don't know internal implementation of Zip, but it could just block inside OnNext of first sequence, waiting until second sequence pushes next item (and visa versa). – Evk Nov 22 '16 at 11:55
  • Ok. I understand a bit better now. If the source is a bounded sequence (one that completes), then Zip will complete. If the the source is unbounded (such as `Observable.Interval`), then things get out of hand pretty quickly and `trueThenFalse` spews out items that (somewhere) start consuming lots of memory, ultimately ending with an OOME. I can't reproduce my original issue, but this is definitely related. – spender Nov 22 '16 at 12:04
  • @haindl No. That's the point of RX. The events are pushed from the observable to the observer. Zip doesn't call the observable. Zip is the observer. The observable calls Zip when it has new items, not on-demand from its observers. – spender Nov 22 '16 at 12:06
  • @Evk I added a test case that exhibits a problem that is related to what I'm asking and exhibits the nasty "unspooling all-at-once" behaviour. I suspect that my actual problem code behaves differently because of schedulers etc, but it's the same problem really. – spender Nov 22 '16 at 12:12
  • How long do you wait until it dies with OOM? – Evk Nov 22 '16 at 12:18
  • Like... 10s of seconds. – spender Nov 22 '16 at 12:19
  • @spender Yes, you are absolutely correct about that, but `.Zip()` grabs the value from the two sequences only when both sequences have produced a new value. `trueThenFalse` is a cold observable, so it's values aren't produced until they are consumed by `.Zip()`, they are only produced lazy. And regarding your example in the edit: I'm running your sample code for many minutes now and I'm not getting any OOM-Ex. – haindl Nov 22 '16 at 12:19
  • I think my test-runner prefers 32bit, so this makes the problem arrive somewhat sooner. – spender Nov 22 '16 at 12:22
  • Yes you are right, Zip will not block inside OnNext but simply buffers elements pushed by both sequences internally (using Queue). So that is indeed not an option. What if create analog of Select which will use long instead of int for index (or just will push "true" for first element in sequence)? – Evk Nov 22 '16 at 12:31
  • Though it would be about the same as using bool in outer scope (but on the other hand, this is just what your current Select does - stores index in outer scope and increments it). – Evk Nov 22 '16 at 12:41
  • @Evk Thanks for your consideration of my problem. I've reached a satisfactory solution and self-answered. – spender Nov 22 '16 at 12:42
  • Nice (though it seems to me that bool in outer scope or custom Select is more clear and easy to undestand :)) – Evk Nov 22 '16 at 12:50
  • @spender Ok, finally I got the OOM-Ex. As said by Evk, `.Zip()` does indeed buffer the elements internally, so `trueThenFalse` is really producing all of its infinite values without waiting. Well... I learned something new today! +1 for your answer using lazy Enumerables with `yield`! I tried that too but I used `Enumerable.Repeat(true, 1).Concat(Inf(false)).ToObservable()` which doesn't change anything. – haindl Nov 22 '16 at 12:51
  • Thanks for looking into it @haindl . Yes, it was most unexpected, so hopefully a valuable learning experience. – spender Nov 22 '16 at 12:54

2 Answers2

3

So it turns out that Zip has another overload that can zip together an IObservable sequence with an IEnumerable sequence.

By combining the push semantics of IObservable with the pull semantics of IEnumerable, it is possible to get my test case working.

So, with the following method:

private IEnumerable<T> Inf<T>(T item)
{
    for (;;)
    {
        yield return item;
    }
}

we can make an IEnumerable:

var trueThenFalse = Enumerable.Repeat(true, 1).Concat(Inf(false));

and then Zip it with a source observable:

var src = Observable.Interval(TimeSpan.FromSeconds(1));
src.Zip(trueThenFalse, (i, tf) => tf).ForEach(x => Trace.TraceInformation("{0}", x));

...and everything works as expected.

I now have the following implementation for my RateLimiter method:

public static IObservable<T> RateLimit<T>(this IObservable<T> source,
                                          TimeSpan minDelay)
{
    var trueThenFalse = Enumerable.Repeat(true, 1).Concat(Inf(false));
    return
        source.TimeInterval()
            .Zip(trueThenFalse, (item, firstTime) => Observable.Return(item.Value)
                .Delay(firstTime
                    ? TimeSpan.Zero
                    : TimeSpan.FromTicks(
                        Math.Max(minDelay.Ticks - item.Interval.Ticks, 0))))

            .Concat();
}
spender
  • 117,338
  • 33
  • 229
  • 351
1

This is similar to Rx IObservable buffering to smooth out bursts of events, though you clearly are trying to understand why your solution does/doesn't work.

I find the solution there more elegant, though to each their own.

Community
  • 1
  • 1
Shlomo
  • 14,102
  • 3
  • 28
  • 43