2

This below example was my attempt at doing this:

var source
    = Observable.Sample(
          Observable.Range(1, int.MaxValue), TimeSpan.FromSeconds(2));

But when I .Subscribe() to that Observable and output it to the console, it shows a sequence like this, one line output every 2 seconds:

OnNext: 312969
OnNext: 584486
OnNext: 862009

Obviously the .Range() observable is running while the .Sample() observable is waiting 2 seconds between each output. I would like to know how to create an observable but that does not allow values to be skipped, so obviously that would look like this:

OnNext: 1
OnNext: 2
OnNext: 3

With one value from .Range() output every 2 seconds. How can I accomplish this in the Reactive Extensions for .NET?

ZeroBugBounce
  • 3,652
  • 3
  • 31
  • 40

3 Answers3

5

Using Observable.GenerateWithTime:

var source = Observable.GenerateWithTime(1, _ => true, x => ++x, x => x, x => TimeSpan.FromSeconds(2));

Observable.Range uses Observable.Generate, so this is one approach. There could be many other ways.

For something more advanced, like dealing with events in the same manner (because this will obviously only help if you are generating the data yourself), see How to throttle event stream using RX? which deals with this problem and has been solved.

Community
  • 1
  • 1
Richard Anthony Hein
  • 10,550
  • 3
  • 42
  • 62
4

I approached this recently by creating an Observable that emits timed events every timeInterval. You can then use the Zip method to sychronize the events from your Observable with those of the timer Observable.

For instance:

    var timer = 
        Observable
            .Timer(
                TimeSpan.FromSeconds(0), 
                TimeSpan.FromSeconds(2)
            );
    var source = Observable.Range(1, int.MaxValue);
    var timedSource = source.Zip(timer,(s,t)=>s);
    timedSource.Subscribe(Console.WriteLine);
spender
  • 117,338
  • 33
  • 229
  • 351
0

A well known example of a pacer:

public static IObservable<T> Pace<T>(this IObservable<T> source, Timespan interval) =>
source
  .Select(p =>
    Observable
     .Empty<T>()
     .Delay(interval)
     .StartWith(p)
  )
  .Concat();
Oleg Dok
  • 21,109
  • 4
  • 45
  • 54
  • 1
    Instead of the `Empty+StartWith` you could use the `Return` method. The problem with the `Pace` operator is that in case the `source` sequence emits values more frequently than the `interval`, you'll end up with a terrible memory leak. – Theodor Zoulias Dec 29 '21 at 13:13
  • Yes, it should be obvious to any developer - if you have an independent emitter - any burst flattener can mead to memory issues – Oleg Dok Dec 30 '21 at 12:25