6

I have a bunch of events coming in and I have to execute ALL of them without a loss, but I want to make sure that they are buffered and consumed at the appropriate time slots. Anyone have a solution?

I can't find any operators in Rx that can do that without the loss of the events (Throttle - looses events). I've also considered Buffered, Delay, etc... Can't find a good solution.

I've tried to put a timer in the middle, but somehow it doesn't work at all:

GetInitSequence()
            .IntervalThrottle(TimeSpan.FromSeconds(5))
            .Subscribe(
                item =>
                    {
                        Console.WriteLine(DateTime.Now);
                        // Process item
                    }
            );

public static IObservable<T> IntervalThrottle<T>(this IObservable<T> source, TimeSpan dueTime)
    {
        return Observable.Create<T>(o =>
            {
                return source.Subscribe(x =>
                    {
                        new Timer(state => 
                            o.OnNext((T)state), x, dueTime, TimeSpan.FromMilliseconds(-1));
                    }, o.OnError, o.OnCompleted);
        });
    }
IgorM
  • 1,068
  • 2
  • 18
  • 29

4 Answers4

13

The question is not 100% clear so I'm making some presumptions.

Observable.Delay is not what you want because that will create a delay from when each event arrives, rather than creating even time intervals for processing.

Observable.Buffer is not what you want because that will cause all events in each given interval to be passed to you, rather than one at a time.

So I believe you're looking for a solution that creates some sort of metronome that ticks away, and gives you an event per tick. This can be naively constructed using Observable.Interval for the metronome and Zip for connecting it to your source:

var source = GetInitSequence();
var trigger = Observable.Interval(TimeSpan.FromSeconds(5));    
var triggeredSource = source.Zip(trigger, (s,_) => s); 
triggeredSource.Subscribe(item => Console.WriteLine(DateTime.Now));

This will trigger every 5 seconds (in the example above), and give you the original items in sequence.

The only problem with this solution is that if you don't have any more source elements for (say) 10 seconds, when the source elements arrive they will be immediately sent out since some of the 'trigger' events are sitting there waiting for them. Marble diagram for that scenario:

source:  -a-b-c----------------------d-e-f-g
trigger: ----o----o----o----o----o----o----o
result:  ----a----b----c-------------d-e-f-g

This is a very reasonable issue. There are two questions here already that tackle it:

Rx IObservable buffering to smooth out bursts of events

A way to push buffered events in even intervals

The solution provided is a main Drain extension method and secondary Buffered extension. I've modified these to be far simpler (no need for Drain, just use Concat). Usage is:

var bufferedSource = source.StepInterval(TimeSpan.FromSeconds(5));

The extension method StepInterval:

public static IObservable<T> StepInterval<T>(this IObservable<T> source, TimeSpan minDelay)
{
    return source.Select(x => 
        Observable.Empty<T>()
            .Delay(minDelay)
            .StartWith(x)
    ).Concat();
}
Community
  • 1
  • 1
yamen
  • 15,390
  • 3
  • 42
  • 52
  • Thanks. Still not really what I was looking for but it showed me some ideas. I'm just frustrated with Rx - why it should be such complex and not have a proper documentation. The learning curve is steep and requires a wide knowledge of the subject in order to achieve something valuable. #fail – IgorM Jul 10 '12 at 15:42
  • 1
    Agreed. That is why I spent a lot of time writing IntroToRx.com to help people in your position. It is hard, and there is a lot to learn. – Lee Campbell Jun 24 '13 at 08:08
  • I really find these Rx operators hard to read and reason about. I think it's my limitation - it's probably because I have a visual mind and I can't visualize the result. Is there any chance of getting a marble diagram for what the code in this answer does? – Tim Long Sep 08 '15 at 01:49
  • For those with the same problem in 2018, RXJS debounceTime is exactly what I was searching for: https://medium.com/aviabird/rxjs-reducing-number-of-api-calls-to-your-server-using-debouncetime-d71c209a4613 – Antony May 23 '18 at 16:39
  • Thank you for this wonderful answer. I got some ideas from it but I'm not sure how to apply this to RxJava. Can you please point me in the right direction? There is no `Drain` or `Select` in RxJava. – hamid Oct 06 '21 at 11:46
1

I know this could just be too simple, but would this work?

var intervaled = source.Do(x => { Thread.Sleep(100); });

Basically this just puts a minimum delay between values. Too simplistic?

Enigmativity
  • 113,464
  • 11
  • 89
  • 172
1

Along the lines of Enigmativity's answer, if all you want to do is just Delay all of the values by a TimeSpan, I cant see why Delay is not the operator you want

  GetInitSequence()
        .Delay(TimeSpan.FromSeconds(5)) //ideally pass an IScheduler here
        .Subscribe(
            item =>
                {
                    Console.WriteLine(DateTime.Now);
                    // Process item
                }
        );
Lee Campbell
  • 10,631
  • 1
  • 34
  • 29
0

How about Observable.Buffer? This should return all the events in the 1s window as a single event.

var xs = Observable.Interval(TimeSpan.FromMilliseconds(100));
var bufferdStream = xs.Buffer(TimeSpan.FromSeconds(5));
bufferdStream.Subscribe(item => { Console.WriteLine("Number of events in window: {0}", item.Count); });

It might be what you're asking isnt that clear. What is your code supposed to do? It looks like you're just delaying by creating a timer for each event. It also breaks the semantics of the observable as the next and complete could occur before the next.

Note this is also only as accurate at the timer used. Typically the timers are accurate to at most 16ms.

Edit:

your example becomes, and item contains all the events in the window:

GetInitSequence()
            .Buffer(TimeSpan.FromSeconds(5))
            .Subscribe(
                item =>
                    {
                        Console.WriteLine(DateTime.Now);
                        // Process item
                    }
            );
Community
  • 1
  • 1
Dave Hillier
  • 18,105
  • 9
  • 43
  • 87