4

I'm quite new to Reactive Extensions and want to buffer a stream based on time, or by a running sum not exceeding a threshold (size of each item is specified by a lambda) whichever occurs first, much like the existing Buffer by count or time.

Currently I have written my own implementation of a Buffer method that works as expected, using the IScheduler for triggering on timeout, and then managing my own buffers in memory and emitting them whenever the accumulated sum exceeds the threshold, but this feels a bit low level and I thought there must be a more elegant solution to express it using the existing reactive operations in some way and maybe using the TBufferClosing overload of Buffer instead.

The best solution I came up with so far is the following, but it has the drawback on including the last item that caused the threshold causing the sum to be larger than the max sum requested:

    public static IObservable<IList<TSource>> Buffer<TSource>(this IObservable<TSource> source, Func<TSource, int> sizeSelector, int maxSize, TimeSpan bufferTimeSpan) 
    {
        var shared = source.Publish().RefCount();
    
        return shared.Buffer(() => Observable.Amb(
            Observable.Timer(timeSpan)
                .Select(_ => Unit.Default),
            shared.Select(sizeSelector)
                .Scan((a, b) => a + b)
                .SkipWhile(accumulated => accumulated < maxSize)
                .Select(_ => Unit.Default))
            );
    }

Is this possible to make work with existing operators (by tweaking my version above or another way completely), or am I forced to stay with my custom Buffer implementation handling timers and buffer myself?

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Per
  • 559
  • 5
  • 11
  • 2
    There be dragons when using `.Publish().RefCount()`. In this case you should simply use `.Publish(shared => ` and drop your `shared` variable. – Enigmativity Nov 10 '18 at 08:09
  • How do you mean by using "existing operators"? Your implementation already does that. I'm confused. – Enigmativity Nov 10 '18 at 08:10
  • @Enigmativity, my implementation has the drawback of actually including the last item causing the sum to go above the threshold, which means the buffer will be too big. – Per Nov 29 '18 at 09:24

2 Answers2

3

OK, this should work. Late answers are better than never. I don't think there's a way to do it better than you did using the Buffer operators.

At core, the problem is a state machine problem, which means you want a Scan solution. The problem with that is, you have two different sources that can alter your state: New item and timeout. Scan doesn't really work with two multiple sources, so we have to somehow combine those two event types into one.

I did something similar before with Discriminated Unions, and that concept should work here. First the solution (uses Nuget package System.Collections.Immutable):

public static class X
{
    public static IObservable<IList<TSource>> Buffer<TSource>(this IObservable<TSource> source, Func<TSource, int> sizeSelector, int maxSize, TimeSpan bufferTimeSpan)
    {
        BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit()); //our time-out mechanism

        return source
            .Publish(_source => _source
                .Union(queue.Delay(bufferTimeSpan))
                .ScanUnion(
                    (list: ImmutableList<TSource>.Empty, size: 0, emitValue: (ImmutableList<TSource>)null),
                    (state, item) =>
                    { // item handler
                        var itemSize = sizeSelector(item);
                        var newSize = state.size + itemSize;
                        if (newSize > maxSize)
                        {
                            queue.OnNext(Unit.Default);
                            return (ImmutableList<TSource>.Empty.Add(item), itemSize, state.list);
                        }
                        else
                            return (state.list.Add(item), newSize, null);
                    },
                    (state, _) =>
                    { // time out handler
                        queue.OnNext(Unit.Default);
                        return (ImmutableList<TSource>.Empty, 0, state.list);
                    }
                )
                .Where(t => t.emitValue != null)
                .Select(t => t.emitValue.ToList())
                .TakeUntil(_source.IgnoreElements().Delay(bufferTimeSpan).Materialize())
        );
    }
}

Explanation: Union combines two streams of different types into one stream, where the item can be either of type A or type B. ScanUnion works just like Scan, but offers two functions for handling the two different types of items.

The BehaviorSubject is hit whenever a new Buffer window opens, the Delay operator makes sure the Scan gets it after the defined timespan. The state inside of the Scan holds the list of current bufferred items, and the 'size'. The emitValue is used when a buffer window closes, and to pass on the values.

Here's the Discriminated Union helper code:

public static class DUnionExtensions
{
    public class DUnion<T1, T2>
    {
        public DUnion(T1 t1)
        {
            Type1Item = t1;
            Type2Item = default(T2);
            IsType1 = true;
        }

        public DUnion(T2 t2, bool ignored) //extra parameter to disambiguate in case T1 == T2
        {
            Type2Item = t2;
            Type1Item = default(T1);
            IsType1 = false;
        }

        public bool IsType1 { get; }
        public bool IsType2 => !IsType1;

        public T1 Type1Item { get; }
        public T2 Type2Item { get; }
    }

    public static IObservable<DUnion<T1, T2>> Union<T1, T2>(this IObservable<T1> a, IObservable<T2> b)
    {
        return a.Select(x => new DUnion<T1, T2>(x))
            .Merge(b.Select(x => new DUnion<T1, T2>(x, false)));
    }

    public static IObservable<TState> ScanUnion<T1, T2, TState>(this IObservable<DUnion<T1, T2>> source,
            TState initialState,
            Func<TState, T1, TState> type1Handler,
            Func<TState, T2, TState> type2Handler)
    {
        return source.Scan(initialState, (state, u) => u.IsType1
            ? type1Handler(state, u.Type1Item)
            : type2Handler(state, u.Type2Item)
        );
    }
}
Shlomo
  • 14,102
  • 3
  • 28
  • 43
  • I tested this solution and it seems that the `Buffer` method returns a never-ending sequence. The `queue` is merged with the `source`, and since the `queue` never receives an `OnCompleted` notification, the merged result never completes either. I was able to fix it by using the `MergeUntilAnyCompletes` operator from [this](https://stackoverflow.com/questions/59261017/how-to-merge-two-observables-with-early-completion "How to merge two observables with early completion") question. – Theodor Zoulias Nov 27 '20 at 16:16
  • Better to add `source.IgnoreElements().Delay(bufferTimeSpan).Select(_ => Unit.Default).Subscribe(queue);`. Elements in the queue can get dropped by immediate exit. – Shlomo Nov 27 '20 at 18:55
  • Hmm, wouldn't this require `Publish`ing the `source` first? – Theodor Zoulias Nov 27 '20 at 19:47
  • 1
    You're right. Edited & substituted for a `TakeUntil` call. – Shlomo Nov 29 '20 at 19:27
0

I think that I've found a compact solution to this problem. The problem with the Buffer implementation in the question, is that the last item that overflows the maxSize is included in the buffer, resulting in the violation of the maxSize policy. My idea to solve this problem is to duplicate all elements in the source sequence, so that the offending element can be filtered out from the current buffer, but still survives and can be included in the next buffer.

Making this idea to work is a bit tricky, but doable. The implementation below passes all of my tests, and enforces correctly all three policies (maxSize, timeSpan and maxCount).

/// <summary>
/// Splits the elements of a sequence into chunks that are sent out when either
/// they are full (by quantity or size), or a given amount of time has elapsed
/// after receiving the first element in the chunk.
/// </summary>
public static IObservable<IList<TSource>> Buffer<TSource>(
    this IObservable<TSource> source,
    Func<TSource, long> sizeSelector, long maxSize,
    TimeSpan timeSpan, int maxCount, IScheduler scheduler = default)
{
    ArgumentNullException.ThrowIfNull(source);
    ArgumentNullException.ThrowIfNull(sizeSelector);
    if (maxSize < 0L) throw new ArgumentOutOfRangeException(nameof(maxSize));
    if (timeSpan < TimeSpan.Zero)
        throw new ArgumentOutOfRangeException(nameof(timeSpan));
    if (maxCount < 1) throw new ArgumentOutOfRangeException(nameof(maxCount));
    scheduler ??= Scheduler.Default;

    return source
        .SelectMany(x => Enumerable.Range(0, 2).Select(i => (Item: x, First: i == 0)))
        .GroupByUntil(_ => 0, g => g
            .SkipWhile((e, i) => i == 0 && e.First)
            .Where((e, i) => i == 0 || e.First)
            .Scan(0L, (acc, e) => checked(acc + Math.Max(0L, sizeSelector(e.Item))))
            .SkipWhile(acc => acc <= maxSize)
            .Select(_ => 0L)
            .Amb(g.Where(e => !e.First).Skip(maxCount - 1).Select(_ => 0L))
            .Amb(Observable.Timer(timeSpan, scheduler)))
        .SelectMany(g => g.Where(e => !e.First).Select(e => e.Item).ToArray());
}

Usage example:

IObservable<IList<Item>> buffered = source
    .Buffer(x => x.Size, 10000, TimeSpan.FromSeconds(5), 50);
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104