1

I'm looking for an observable extension method to do an inverse throttle. What I mean by this is to let the first item pass and then ignore items that follow the items within a due time.

input - due time 2
|*.*.*..*..|
output
|*......*..|

Note that this is a different question than the questions below (which are all the same). The questions below require a fixed suppression duration while I require a suppression duration that is increased every time a new item arrives too early. Visually the output of the solutions listed below is as follows:

input - due time 2
|*.*.*..*..|
output
|*...*..*..|

UPDATE

I came up with the following solution, however I do not know enough about schedulers and concurrency to be sure the locking is good enough. I also don't know how to implement this method when a Scheduler argument is added to the method.

    public static IObservable<T> InverseThrottle<T>(this IObservable<T> source, TimeSpan dueTime)
    {
        IDisposable coolDownSupscription = null;
        object subscriptionLock = new object();

        return source
            .Where(i =>
            {
                lock (subscriptionLock)
                {
                    bool result;
                    if (coolDownSupscription == null)
                    {
                        result = true;
                    }
                    else
                    {
                        coolDownSupscription.Dispose();
                        result = false;
                    }

                    coolDownSupscription = Observable
                            .Interval(dueTime)
                            .Take(1)
                            .Subscribe(_ =>
                            {
                                lock (subscriptionLock)
                                {
                                    coolDownSupscription = null;
                                }
                            });
                    return result;
                }
            });
    }
Community
  • 1
  • 1
Wouter
  • 2,170
  • 1
  • 28
  • 58

2 Answers2

3

You can use something like this...

source
    .GroupByUntil(
        x => Unit.Default,
        x => x.Throttle(TimeSpan.FromSeconds(100))
    )
    .SelectMany(
        x => x.ToList().Take(1) // yields first item on completion of the observable.
    );
cwharris
  • 17,835
  • 4
  • 44
  • 64
  • Throttle won't "fire" though, maybe you want Observable.Timer? – Ana Betts Jun 13 '14 at 02:02
  • 2
    @PaulBetts Throttle should "fire" the first item as soon as the source stream pauses for `100`. – Brandon Jun 13 '14 at 12:36
  • @PaulBetts. Brandon is right. The Throttle is going to fire as soon as the source stream finishes debounces, which means one additional item will be yielded, until it finishes debouncing again, etc. – cwharris Jun 13 '14 at 18:10
  • This does not work for me, the second argument (elementSelector) is of type Func. If source were an IObservable, g would not be an IObservable but a Unit. – Wouter Aug 01 '14 at 08:11
  • There may be some inconsistencies with your comment. I'm not sure what you're trying to say, but I'm quite certain this solution would work, regardless of the `T` in your source `IObservable`. – cwharris Aug 06 '14 at 07:52
  • Did you try it? Whatever I try Visual Studio won't swallow it. Have a gander at my gist: https://gist.github.com/frogger3d/dc74ad4731a6e651a99c – Wouter Aug 07 '14 at 09:13
  • Oh wow. You're absolutely correct. This overload does not exist in the .NET version of Rx. My apologies. I will attempt to create it. – cwharris Aug 08 '14 at 00:53
  • 1
    I've been hiding in a cave for 3 years spending all of my time figuring out how to get this to work in .NET, and I think I've finally done it. No, but seriously, I updated the answer for those still looking. – cwharris Nov 06 '17 at 22:02
0

I suggest this one.

public static class IObservable_FirstThenThrottle
{
    public static IObservable<TSource> FirstThenThrottle<TSource>(this IObservable<TSource> source, TimeSpan dueTime)
    {
        var first = source.Take(1);
        var second = source.Skip(1).Throttle(dueTime);
        return first.Merge(second);
    }
}

It fires when the first item comes in. Then throttle the remaining sequence by dueTime.

Here is a marble diagram showing what happens with dueTime = 2.

source 0-1-2--3--|
result 0------2--3--|
nverinaud
  • 1,270
  • 14
  • 25
  • It seems like it should work, but it doesn't. The output is very irregular. It also does some funky stuff to the source, when I add a Do at the start to print every item the first item occurs twice. – Wouter Aug 01 '14 at 08:30