I'm quite new to Reactive Extensions and want to buffer a stream based on time, or by a running sum not exceeding a threshold (size of each item is specified by a lambda) whichever occurs first, much like the existing Buffer
by count or time.
Currently I have written my own implementation of a Buffer
method that works as expected, using the IScheduler
for triggering on timeout, and then managing my own buffers in memory and emitting them whenever the accumulated sum exceeds the threshold, but this feels a bit low level and I thought there must be a more elegant solution to express it using the existing reactive operations in some way and maybe using the TBufferClosing
overload of Buffer
instead.
The best solution I came up with so far is the following, but it has the drawback on including the last item that caused the threshold causing the sum to be larger than the max sum requested:
public static IObservable<IList<TSource>> Buffer<TSource>(this IObservable<TSource> source, Func<TSource, int> sizeSelector, int maxSize, TimeSpan bufferTimeSpan)
{
var shared = source.Publish().RefCount();
return shared.Buffer(() => Observable.Amb(
Observable.Timer(timeSpan)
.Select(_ => Unit.Default),
shared.Select(sizeSelector)
.Scan((a, b) => a + b)
.SkipWhile(accumulated => accumulated < maxSize)
.Select(_ => Unit.Default))
);
}
Is this possible to make work with existing operators (by tweaking my version above or another way completely), or am I forced to stay with my custom Buffer
implementation handling timers and buffer myself?