5

I need to implement an event processing, that is done delayed when there are no new events arriving for a certain period. (I have to queue up a parsing task when the text buffer changed, but I don't want to start the parsing when the user is still typing.)

I'm new in RX, but as far as I see, I would need a combination of BufferWithTime and the Timeout methods. I imagine this to be working like this: it buffers the events until they are received regularly within a specified time period between the subsequent events. If there is a gap in the event flow (longer than the timespan) it should return propagate the events buffered so far.

Having a look at how Buffer and Timeout is implemented, I could probably implement my BufferWithTimeout method (if everyone have one, please share with me), but I wonder if this can be achieved just by combining the existing methods. Any ideas?

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Gaspar Nagy
  • 4,422
  • 30
  • 42
  • Does this answer your question? [Reactive Throttle returning all items added within the TimeSpan](https://stackoverflow.com/questions/8849810/reactive-throttle-returning-all-items-added-within-the-timespan) – Theodor Zoulias Oct 28 '22 at 12:00
  • @TheodorZoulias This was 10+ years ago and I don't work on that project anymore, so I cannot verify. – Gaspar Nagy Nov 03 '22 at 15:21

4 Answers4

15

This is quite an old question, but I do believe the following answer is worth mentioning, since all other solutions have forced the user to subscribe manually, track changes, etc.

I offer the following as an "Rx-y" solution.

var buffers = source
    .GroupByUntil(
        // yes. yes. all items belong to the same group.
        x => true,
        g => Observable.Amb<int>(
               // close the group after 5 seconds of inactivity
               g.Throttle(TimeSpan.FromSeconds(5)),
               // close the group after 10 items
               g.Skip(9)
             ))
    // Turn those groups into buffers
    .SelectMany(x => x.ToArray());

Basically, the source is windowed until some observerable defined in terms of the newest window. A new window (grouped observable) is created, and we use that window to determine when the window should close. In this case, I'm closing the window after 5 seconds of inactivity or a maximum length of 10 (9+1).

cwharris
  • 17,835
  • 4
  • 44
  • 64
3

I think BufferWithTime is what you are after.

There is nothing built in, but something like this should work:

Note: If an error occurs from the source, the buffer is not flushed. This matches the current (or current last time I checked) functionality of BufferWith*

public static IObservable<TSource[]> BufferWithTimeout<TSource>(
    this IObservable<TSource> source, TimeSpan timeout)
{
    return source.BufferWithTimeout(timeout, Scheduler.TaskPool);
}

public static IObservable<TSource[]> BufferWithTimeout<TSource>(
    this IObservable<TSource> source, TimeSpan timeout, IScheduler scheduler)
{
    return Observable.CreateWithDisposable<TSource[]>(observer =>
    {
        object lockObject = new object();
        List<TSource> buffer = new List<TSource>();

        MutableDisposable timeoutDisposable = new MutableDisposable();

        Action flushBuffer = () =>
        {
            TSource[] values;

            lock(lockObject)
            {
                values = buffer.ToArray();
                buffer.Clear();
            }

            observer.OnNext(values);
        };

        var sourceSubscription = source.Subscribe(
            value =>
            {
                lock(lockObject)
                {
                    buffer.Add(value);
                }

                timeoutDisposable.Disposable = 
                    scheduler.Schedule(flushBuffer, timeout);
            },
            observer.OnError,
            () =>
            {
                flushBuffer();
                observer.OnCompleted();
            });

        return new CompositeDisposable(sourceSubscription, timeoutDisposable);
    });
}
Richard Szalay
  • 83,269
  • 19
  • 178
  • 237
  • 1
    BufferWithTime triggers regularly with a time period, but I want to have triggering after a certain idle time. – Gaspar Nagy Jan 11 '11 at 09:44
  • Does that not mean that if there is never a gap, you will never receive output? – Richard Szalay Jan 11 '11 at 09:47
  • yes, if there is never a gap, I don't want to receive any event. But since the events are generated by the key presses of the users and the timeout is 100ms, there will be a gap sooner or later :) – Gaspar Nagy Jan 11 '11 at 09:49
  • Updated post compilation fixes and testing. – Richard Szalay Jan 11 '11 at 10:22
  • Cool! Thx! I'll have a look in detail. – Gaspar Nagy Jan 11 '11 at 10:31
  • I had a look. I had to make two corrections to get it properly run: 1 - the type parameter have to be written out expilictly "return Observable.CreateWithDisposable(observer =>" 2 - the timeout parameter was missing from the schedule call: "timeoutDisposable.Disposable = scheduler.Schedule(flushBuffer, timeout);" With these changes, it runs perfect. Thx! – Gaspar Nagy Jan 12 '11 at 09:32
  • It seems rx.net has introduced some breaking changes so this code wont compile anymore. – eatfrog May 06 '14 at 11:29
2

In addition to Richard Szalay's answer I've just been looking into the new Window operator from the latest rx release. It 'kind of' solves you problem in that you can 'buffer with a time out', i.e. get the output within a window of time that lasts until the timeout is reached, but instead of receiving the results as an IEnumerable you actually get them as an IObservable.

Here's a quick example of what I mean:

private void SetupStream()
{
    var inputStream = Observable.FromEvent<MouseButtonEventHandler, MouseButtonEventArgs>(
        h => new MouseButtonEventHandler(h), 
        h => MouseDown += h,
        h => MouseDown -= h);

    var timeout = inputStream.Select(evt => Observable.Timer(TimeSpan.FromSeconds(10), Scheduler.Dispatcher))
        .Switch();

    inputStream.Window(() => timeout)
        .Subscribe(OnWindowOpen);
}


private void OnWindowOpen(IObservable<IEvent<MouseButtonEventArgs>> window)
{
    Trace.WriteLine(string.Format("Window open"));

    var buffer = new List<IEvent<MouseButtonEventArgs>>();

    window.Subscribe(click =>
    {

        Trace.WriteLine(string.Format("Click"));

        buffer.Add(click);

    }, () => ProcessEvents(buffer));
}

private void ProcessEvents(IEnumerable<IEvent<MouseButtonEventArgs>> clicks)
{
    Trace.WriteLine(string.Format("Window closed"));

    //...
}

Every time the window opens, you receive all the events as and when they come in, store them in a buffer and process when the window completes (which actually happens when the next window opens).

Not sure if Richard would change his example to use Window now it's available but thought it might be worth raising as an alternative.

James Hay
  • 12,580
  • 8
  • 44
  • 67
1

If you just need to run an operation when the user stops typing for a certain amount of time, and don't necessarily need the intermediate events, then Throttle is the operation you're after. Check here for an example of its usage in that scenario.

Bryan Anderson
  • 15,969
  • 8
  • 68
  • 83
  • In my case I needed the changes as well not only the final result (this is a big text block and I only re-parse the changed parts), but you are right, if the individual changes are not interesting, the Throttle is a good option. – Gaspar Nagy Jan 12 '11 at 09:56
  • In my application I want to do some processing when the user has stopped moving the Camera in a 3D view. Throttle was a good option for this. – TortoiseTNT Jun 10 '13 at 14:53
  • link is broken, can the example be added to the post? – CCondron Mar 17 '18 at 02:07