4

I'm using Reactive Extensions (Rx) in C# and want to filter events in the following way. Imagine I have the following originator sequence:

A B C D E F X G H I X J X X K L M N O X P

And I want to produce the following output:

E F X H I X J X X N O X

Basically, I would buffer (throttle?) events with a max bound (in the example this bound is 2), and when I get certain event (in this case event X), I want to flush that buffer to the output and start buffering again until I see the special event again.

I'm trying a few approaches but without any luck, and I imagine there should be an easy way to accomplish it that I am missing.

EDIT: one constraint, is that I expect to get TONS of events that are discarded, and only a few instances of X, so keeping a buffer in memory with thousands of events to read only the last 2 (or 20) is not really an option.

Julian Dominguez
  • 2,573
  • 21
  • 15

3 Answers3

1

For convenience we need the following two extension functions:

public static class Extensions
{
    public static IObservable<IList<TSource>> BufferUntil<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)
    {
        var published = source.Publish().RefCount();
        return published.Buffer(() => published.Where(predicate));
    }

    public static IEnumerable<TSource> TakeLast<TSource>(this IEnumerable<TSource> source, int count)
    {
        return source.Reverse().Take(count).Reverse();
    }
}

We then solve the problem like so:

source.BufferUntil(c => c == 'X')
    .SelectMany(list => list.TakeLast(3))

Output:

E F X H I X J X X N O X
Alex
  • 7,639
  • 3
  • 45
  • 58
  • Thanks, it does work, but using a buffer here means that you would be buffering potentially thousands of events until you get an X, just to take the latest ones. I'd like something that does not even hold a reference to past events if they are not needed. – Julian Dominguez Mar 13 '13 at 21:31
1

I'll piggyback on another answer I posted here: Trouble Implementing a Sliding Window in Rx

The important bit is this extension method:

public static class Ext
{
    public static IObservable<IList<T>> SlidingWindow<T>(
        this IObservable<T> src, 
        int windowSize)
    {
        var feed = src.Publish().RefCount();    
        // (skip 0) + (skip 1) + (skip 2) + ... + (skip nth) => return as list  
        return Observable.Zip(
        Enumerable.Range(0, windowSize)
            .Select(skip => feed.Skip(skip))
            .ToArray());
    }
}

Which you can use thusly:

void Main()
{
    // A faked up source
    var source = new Subject<char>();

    var bufferSize = 2;
    Func<char, bool> eventTrigger = c => c == 'X';

    var query = source
        .Publish()
        .RefCount()
        // Want one extra slot to detect the "event"
        .SlidingWindow(bufferSize + 1)
        .Where(window => eventTrigger(window.Last()))
        .Select(buffer => buffer.ToObservable())
        .Switch();

    using(query.Subscribe(Console.WriteLine))
    {
        source.OnNext('A');
        source.OnNext('B');
        source.OnNext('C');
        source.OnNext('D');
        source.OnNext('E');
        source.OnNext('F');
        source.OnNext('X');
        source.OnNext('G');
        source.OnNext('H');
        source.OnNext('I');
        source.OnNext('X');
        Console.ReadLine();
    }    
}

Output:

E
F
X
H
I
X
Community
  • 1
  • 1
JerKimball
  • 16,584
  • 3
  • 43
  • 55
1

Here is a stab to answer my own question, please let me know if you see any issues with it.

public static class ObservableHelper
{
    /// <summary>
    /// Buffers entries that do no satisfy the <paramref name="shouldFlush"/> condition, using a circular buffer with a max
    /// capacity. When an entry that satisfies the condition ocurrs, then it flushes the circular buffer and the new entry,
    /// and starts buffering again.
    /// </summary>
    /// <typeparam name="T">The type of entry.</typeparam>
    /// <param name="stream">The original stream of events.</param>
    /// <param name="shouldFlush">The condition that defines whether the item and the buffered entries are flushed.</param>
    /// <param name="bufferSize">The buffer size for accumulated entries.</param>
    /// <returns>An observable that has this filtering capability.</returns>
    public static IObservable<T> FlushOnTrigger<T>(this IObservable<T> stream, Func<T, bool> shouldFlush, int bufferSize)
    {
        if (stream == null) throw new ArgumentNullException("stream");
        if (shouldFlush == null) throw new ArgumentNullException("shouldFlush");
        if (bufferSize < 1) throw new ArgumentOutOfRangeException("bufferSize");

        return System.Reactive.Linq.Observable.Create<T>(observer =>
        {
            var buffer = new CircularBuffer<T>(bufferSize);
            var subscription = stream.Subscribe(
                newItem =>
                    {
                        bool result;
                        try
                        {
                            result = shouldFlush(newItem);
                        }
                        catch (Exception ex)
                        {
                            return;
                        }

                        if (result)
                        {
                            foreach (var buffered in buffer.TakeAll())
                            {
                                observer.OnNext(buffered);
                            }

                            observer.OnNext(newItem);
                        }
                        else
                        {
                            buffer.Add(newItem);
                        }
                    },
                observer.OnError,
                observer.OnCompleted);

            return subscription;
        });
    }
}

By the way, CircularBuffer does not exist out of the box, but the implementation is straightforward.

Then I just call:

        data
            .FlushOnTrigger(item => item == 'X', bufferSize: 2)
            .Subscribe(Console.WriteLine);
Julian Dominguez
  • 2,573
  • 21
  • 15
  • Aside from argument checking, I only see one major oversight. You do not pass the OnError or OnCompleted notifications. (Pass `observer.OnError` and `observer.OnCompleted` to the `Subscribe` call you make should work here.) I doubt that the `Synchronize` call will always be needed and would generally leave that up to the caller to insert when needed. Not related to the RX portion of the question, but I would expect `CircularBuffer` to implement `IEnumerable` directly instead of needing the `ReadAll` method. – Gideon Engelberth Mar 14 '13 at 04:38
  • 1
    What is the `Synchronize()` operator here? – Alex Mar 14 '13 at 09:17
  • Thanks for the feedback, I removed the Synchronize call. The reason for having a TakeAll method is that I want to make it clear (and enforced) that the moment I read the entire buffer, then this is automatically emptied (instead of enumerating the items on one hand and then clearing the buffer) – Julian Dominguez Aug 23 '13 at 20:49
  • 1
    This approach is now being used in a QuickStart sample application that uses the Semantic Logging block to buffer versbose events in memory and only flush those to the log when an error occurs. You can find the code at http://go.microsoft.com/fwlink/p/?LinkID=290898 (EnterpriseLibrary6-QuickStarts-source.exe is the relevant file) – Julian Dominguez Aug 23 '13 at 20:54