In various occasions I've wished for an Rx Replay
operator that buffers the incoming notifications, replays its buffer synchronously when it is subscribed for the first time, and stops buffering after that. This lightweight Replay
operator should be able to serve only one subscriber. One use case for such an operator can be found here, where continuing to buffer after the first subscription is just a waste of resources. For demonstration purposes I'll show here a contrived example of the problematic behavior I wish I could avoid:
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(500))
.SelectMany(x => Enumerable.Range((int)x * 100_000 + 1, 100_000))
.Take(800_000)
.Do(x =>
{
if (x % 100_000 == 0) Console.WriteLine(
$"{DateTime.Now:HH:mm:ss.fff} > " +
$"Produced: {x:#,0}, TotalMemory: {GC.GetTotalMemory(true):#,0} bytes");
})
.Replay()
.AutoConnect(0);
await Task.Delay(2200);
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Subscribing...");
// First subscription
await observable.Do(x =>
{
if (x % 100_000 == 0)
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Emitted: {x:#,0}");
});
// Second subscription
Console.WriteLine($"Count: {await observable.Count():#,0}");
The observable generates 800,000 values in total. The Replay
mechanism is connected immediately to the source, and it is subscribed halfway before its completion.
Output:
16:54:19.893 > Produced: 100,000, TotalMemory: 635,784 bytes
16:54:20.341 > Produced: 200,000, TotalMemory: 1,164,376 bytes
16:54:20.840 > Produced: 300,000, TotalMemory: 2,212,992 bytes
16:54:21.354 > Produced: 400,000, TotalMemory: 2,212,992 bytes
16:54:21.543 > Subscribing...
16:54:21.616 > Emitted: 100,000
16:54:21.624 > Emitted: 200,000
16:54:21.633 > Emitted: 300,000
16:54:21.641 > Emitted: 400,000
16:54:21.895 > Produced: 500,000, TotalMemory: 4,313,344 bytes
16:54:21.897 > Emitted: 500,000
16:54:22.380 > Produced: 600,000, TotalMemory: 6,411,208 bytes
16:54:22.381 > Emitted: 600,000
16:54:22.868 > Produced: 700,000, TotalMemory: 6,411,600 bytes
16:54:22.869 > Emitted: 700,000
16:54:23.375 > Produced: 800,000, TotalMemory: 6,413,400 bytes
16:54:23.376 > Emitted: 800,000
Count: 800,000
The memory usage keeps growing after the subscription. This is expected, because all values are buffered, and remain buffered for the whole lifetime of the replayed observable. The desirable behavior would be for the memory usage to plummet after the subscription. The buffer should be discarded after propagating the buffered values, because there is no use for it after the subscription. Also the second subscription (the await observable.Count()
) should fail with an InvalidOperationException
. I don't want to be able to subscribe again to the observable, after it has lost its Replay
functionality.
Here is the stub of the custom ReplayOnce
operator I am trying to implement. Does anyone has any idea about how to implement it?
public static IConnectableObservable<T> ReplayOnce<T>(this IObservable<T> source)
{
return source.Replay(); // TODO: enforce the subscribe-once policy
}
Btw there is a related question here, about how to make a Replay
operator with a buffer that can be emptied occasionally on demand. My question is different in that I want the buffer to be completely disabled after the subscription, and not start growing ever again.