As Rohit Sharma mentioned with his comment at Colonel Panic's solution, there is a problem with where items will be buffered and will not be pushed to subscriber unless an item is generated.
As described in this comment the problem is p.Window(() => closes)
, because it opens up a gap in which events can be missed.
That lambda is going to be invoked after each window is processed. And the Window operator is going to call Subscribe on what the lambda returns each time, because as far as it knows, you might return a completely different IObservable from that lambda every time.
Since now always the same lambda is used, we need to adjust the maxCount. Without the change the maxCount would never be reseted and after it was hit once, every new event would be over the maxCount.
public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? maxCount = null)
{
var publish = stream.Publish(p =>
{
var closes = p.Throttle(delay);
if (maxCount != null)
{
Int32 i = 0;
var overflows = p.Where(x =>
{
++i;
if (i >= maxCount)
{
i = 0;
return true;
}
return false;
});
closes = closes.Merge(overflows);
}
return p.Window(closes).SelectMany(window => window.ToList());
});
return publish;
}
Update:
After further tests i found out that still, in some cases, items will not be correctly pushed to the subscriber.
Here is the workaround which works for us since already 4 months without any problems.
The workaround is adding .Delay(...)
with any TimeSpan
.
public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? maxCount = null)
{
var publish = stream.Publish(p =>
{
var closes = p.Throttle(delay);
if (maxCount != null)
{
var overflows = stream.Where((x, index) => index + 1 >= maxCount);
closes = closes.Merge(overflows);
}
return p.Window(() => closes).SelectMany(window => window.ToList()).Delay(TimeSpan.Zero);
});
return publish;
}