Using a bufferClosingSelector factory method
decPL suggested using the overload of Buffer
that accepts a bufferClosingSelector
- a factory function that is called at the opening of a new buffer. It produces a stream whose first OnNext()
or OnCompleted()
signals flushing the current buffer. decPLs code looked like this:
observable.Buffer(() => observable.Throttle(TimeSpan.FromSeconds(5)))
This makes considerable progress towards a solution, but it has a couple of problems:
- The server will not send messages during periods of activity where messages are published consistently within the throttle duration. This could result in large, infrequently published lists.
- There are multiple subscriptions to the source; if it is cold this may have unintended side effects. The
bufferClosingSelector
factory is called after each buffer closing, so if the source is cold it would be throttling from the initial events, rather than the most recent.
Preventing indefinite throttling
We need to use an additional mechanism to limit the buffer length and prevent indefinite throttling. Buffer
has an overload that allows you to specify a maximum length, but unfortunately you can't combine this with a closing selector.
Let's call the desired buffer length limit n. Recall the first OnNext
of the closing selector is enough to close the buffer, so all we need to do is Merge
the throttle with a counting stream that sends OnNext
after n events from the source. We can use .Take(n).LastAsync()
to do this; take the first n events but ignore all but the last of this. This is a very useful pattern in Rx.
Making the source "hot"
In order to address the issue of the bufferClosingSelector
factory resubscribing to the source, we need to use the common pattern of .Publish().RefCount()
on the source to give us a stream that will only send the most recent events to subscribers. This is also a very useful pattern to remember.
Solution
Here is the reworked code, where the throttle duration is merged with a counter:
var throttleDuration = TimeSpan.FromSeconds(5);
var bufferSize = 3;
// single subscription to source
var sourcePub = source.Publish().RefCount();
var output = sourcePub.Buffer(
() => sourcePub.Throttle(throttleDuration)
.Merge(sourcePub.Take(bufferSize).LastAsync()));
Production Ready Code & Tests
Here is a production ready implementation with tests (use nuget packages rx-testing & nunit). Note the parameterization of the scheduler to support testing.
public static partial class ObservableExtensions
{
public static IObservable<IList<TSource>> BufferNearEvents<TSource>(
this IObservable<TSource> source,
TimeSpan maxInterval,
int maxBufferSize,
IScheduler scheduler)
{
if (scheduler == null) scheduler = ThreadPoolScheduler.Instance;
if (maxBufferSize <= 0)
throw new ArgumentOutOfRangeException(
"maxBufferSize", "maxBufferSize must be positive");
var publishedSource = source.Publish().RefCount();
return publishedSource.Buffer(
() => publishedSource
.Throttle(maxInterval, scheduler)
.Merge(publishedSource.Take(maxBufferSize).LastAsync()));
}
}
public class BufferNearEventsTests : ReactiveTest
{
[Test]
public void CloseEventsAreBuffered()
{
TimeSpan maxInterval = TimeSpan.FromTicks(200);
const int maxBufferSize = 1000;
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3));
IList<int> expectedBuffer = new [] {1, 2, 3};
var expectedTime = maxInterval.Ticks + 300;
var results = scheduler.CreateObserver<IList<int>>();
source.BufferNearEvents(maxInterval, maxBufferSize, scheduler)
.Subscribe(results);
scheduler.AdvanceTo(1000);
results.Messages.AssertEqual(
OnNext<IList<int>>(expectedTime, buffer => CheckBuffer(expectedBuffer, buffer)));
}
[Test]
public void FarEventsAreUnbuffered()
{
TimeSpan maxInterval = TimeSpan.FromTicks(200);
const int maxBufferSize = 1000;
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(1000, 1),
OnNext(2000, 2),
OnNext(3000, 3));
IList<int>[] expectedBuffers =
{
new[] {1},
new[] {2},
new[] {3}
};
var expectedTimes = new[]
{
maxInterval.Ticks + 1000,
maxInterval.Ticks + 2000,
maxInterval.Ticks + 3000
};
var results = scheduler.CreateObserver<IList<int>>();
source.BufferNearEvents(maxInterval, maxBufferSize, scheduler)
.Subscribe(results);
scheduler.AdvanceTo(10000);
results.Messages.AssertEqual(
OnNext<IList<int>>(expectedTimes[0], buffer => CheckBuffer(expectedBuffers[0], buffer)),
OnNext<IList<int>>(expectedTimes[1], buffer => CheckBuffer(expectedBuffers[1], buffer)),
OnNext<IList<int>>(expectedTimes[2], buffer => CheckBuffer(expectedBuffers[2], buffer)));
}
[Test]
public void UpToMaxEventsAreBuffered()
{
TimeSpan maxInterval = TimeSpan.FromTicks(200);
const int maxBufferSize = 2;
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3));
IList<int>[] expectedBuffers =
{
new[] {1,2},
new[] {3}
};
var expectedTimes = new[]
{
200, /* Buffer cap reached */
maxInterval.Ticks + 300
};
var results = scheduler.CreateObserver<IList<int>>();
source.BufferNearEvents(maxInterval, maxBufferSize, scheduler)
.Subscribe(results);
scheduler.AdvanceTo(10000);
results.Messages.AssertEqual(
OnNext<IList<int>>(expectedTimes[0], buffer => CheckBuffer(expectedBuffers[0], buffer)),
OnNext<IList<int>>(expectedTimes[1], buffer => CheckBuffer(expectedBuffers[1], buffer)));
}
private static bool CheckBuffer<T>(IEnumerable<T> expected, IEnumerable<T> actual)
{
CollectionAssert.AreEquivalent(expected, actual);
return true;
}
}