12

I have an application which at some points raises 1000 events almost at the same time. What I would like to do is to batch the events to chunks of 50 items and start processing them every 10 seconds. There's no need to wait for a batch to complete before starting a new batch processing.

For example:

10:00:00: 10000 new events received
10:00:00: StartProcessing (events.Take(50))
10:00:10: StartProcessing (events.Skip(50).Take(50))
10:00:15: StartProcessing (events.Skip(100).Take(50))

Any ideas how to achieve this? I suppose Reactive Extensions is the way to go but other solutions are acceptable too.

I tried to start from here:

        var bufferedItems = eventAsObservable
            .Buffer(15)
            .Delay(TimeSpan.FromSeconds(5)

But noticed that the delay didn't work as I hoped for and instead all the batches started simultaneously, though 5 seconds delayed.

I also tested the Window-method, but I didn't notice any difference in behavior. I suppose the TimeSpan in Window actually means that "take every event which happens in the next 10 seconds:

        var bufferedItems = eventAsObservable
            .Window(TimeSpan.FromSeconds(10), 5)
            .SelectMany(x => x)
            .Subscribe(DoProcessing);

I'm using the Rx-Main 2.0.20304-beta.

Mikael Koskinen
  • 12,306
  • 5
  • 48
  • 63

4 Answers4

23

If you'd prefer not to sleep threads, you can do this:

var tick = Observable.Interval(TimeSpan.FromSeconds(5));

eventAsObservable
.Buffer(50)
.Zip(tick, (res, _) => res)
.Subscribe(DoProcessing);
yamen
  • 15,390
  • 3
  • 42
  • 52
  • 1
    I wish I could upvote this more than once! Just spent three hours trying to figure it out. – Dan Abramov Nov 22 '12 at 16:20
  • I have a similar problem. But I want the next batch to start not before the last batch is finished. – Zuendi May 14 '20 at 13:09
  • 1
    @Zuendi you may find this interesting: [What's a good way to run periodic tasks using Rx, with a single concurrent execution restriction?](https://stackoverflow.com/questions/31399054/whats-a-good-way-to-run-periodic-tasks-using-rx-with-a-single-concurrent-execu) – Theodor Zoulias Jan 11 '21 at 03:06
2

There's a specific Buffer method overload just for this: https://msdn.microsoft.com/en-us/library/hh229200(v=vs.103).aspx

observable.Buffer(TimeSpan.FromSeconds(5), 50);
Sámal Rasmussen
  • 2,887
  • 35
  • 36
  • 1
    This is not what the OP wants. He wants to process every 5 seconds. This extension will also trigger if the buffer has 50 elements. And this could happen before 5 seconds... – Marco May 16 '20 at 07:30
1

This is a surprisingly difficult problem to solve. More so because the enticing idea of using the Zip operator to align the observable with an Observable.Interval, is buggy and dangerously inefficient. The main problem with the Zip operator, when used with asymmetric observables, is that it buffers the elements of the fastest producing observable, resulting potentially to massive memory allocation during a long-life subscription. IMHO the use of this operator should be limited to pairs of observables that are expected to produce an equal (or close to equal) number of elements in the long run.

The buggy behavior of the Zip+Observable.Interval combo emerges when the Observable.Interval emits values faster than the source observable. In that case the superfluous values emitted by the Observable.Interval are buffered, so when the source observable emits the next element there is already a buffered Interval value to form a pair, resulting to the violation of the "minimum interval between elements" policy.

Below is an implementation of a custom WithInterval operator that imposes a minimum interval between consecutive elements of an observable sequence. This operator will then be used for solving the specific problem of this question, that involves buffers instead of individual elements:

/// <summary>Intercepts a minimum interval between consecutive elements of an
/// observable sequence.</summary>
public static IObservable<T> WithInterval<T>(this IObservable<T> source,
    TimeSpan interval, IScheduler scheduler = null)
{
    return source
        .Scan((Observable.Return(0L), (IObservable<T>)null), (state, x) =>
        {
            var (previousTimer, _) = state;
            var timer = (scheduler != null ? Observable.Timer(interval, scheduler)
                : Observable.Timer(interval)).PublishLast();
            var delayed = previousTimer.Select(_ => x).Finally(() => timer.Connect());
            return (timer, delayed);
        })
        .Select(e => e.Item2)
        .Concat();
}

This implementation places an Observable.Timer between consecutive elements. The tricky part is how to activate each timer at exactly the right moment. It is achieved by Publishing the timers, and having each timer warm (Connect) the next one when it completes.

With this operator in place, implementing a custom BatchWithInterval operator is trivial:

/// <summary>Projects each element of an observable sequence into consecutive
/// non-overlapping buffers which are produced based on element count information,
/// intercepting a minimum interval between consecutive buffers.</summary>
public static IObservable<IList<T>> BatchWithInterval<T>(this IObservable<T> source,
    int count, TimeSpan interval, IScheduler scheduler = null)
{
    return source.Buffer(count).WithInterval(interval, scheduler);
}

Usage example:

var subscription = eventAsObservable
    .BatchWithInterval(50, TimeSpan.FromSeconds(10))
    .Subscribe(DoProcessing);
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Caution, the `Concat` operator [behaves wierdly](https://github.com/dotnet/reactive/issues/1634) in the current version of the Rx library (5.0.0). My advice is to use the equivalent `Merge(1)` operator instead, until the issue with the `Concat` is fixed. The `1` is the value of the `maxConcurrent` parameter. Setting this parameter to `1` means no concurrency. – Theodor Zoulias Nov 05 '21 at 19:50
-1

Try this:

    var bufferedItems = eventAsObservable
        .Buffer(50)
        .Do(_ => { Thread.Sleep(10000); });
Enigmativity
  • 113,464
  • 11
  • 89
  • 172