The Reactive Extensions library (Rx) seems like a good candidate for solving this problem. It is based on two interfaces: IObservable<T>
= publisher and IObserver<T>
= subscriber, and offers many implementations and combinators of these interfaces. For example the class ReplaySubject<T>
implements both of these interfaces, and it has a constructor that accepts a parameter TimeSpan window
, that defines the maximum time length of the replay buffer.
You can see below how you could instantiate a ReplaySubject<T>
having a 30 seconds retention period, how to enqueue an item in it, how to subscribe a consumer that will receives notifications for all items currently in the subject, as well as all items that will be enqueued in the future, and how to end a subscription to stop receiving notifications:
using System.Reactive.Subjects;
//...
var subject = new ReplaySubject<Item>(TimeSpan.FromSeconds(30));
//...
subject.OnNext(new Item());
//...
var subscription = subject.Subscribe((Item x) => Console.WriteLine($"Received {x}"));
//...
subscription.Dispose();
As for the "expired TTL" functionality, it seems tricky because AFAIK there are no built-in components offering this functionality out of the box in Rx. I thought about implementing this as a custom ISubject<T>
, based on a structure of nested subjects: ReplaySubject<ReplaySubject<T>>
. The inner subjects would contain a single value, and would be instantiated with a window
equal to the TTL expiration period of the specific item. The consumer would subscribe to this structure after merging it with the Merge
operator. This idea is originated from this question: How can I clear the buffer on a ReplaySubject? Here is an implementation:
/// <summary>
/// Represents an object that is both an observable sequence as well as an observer.
/// Each notification is broadcasted to all subscribed and future observers, subject
/// to buffer trimming and notification expiration policies.
/// </summary>
public class ExpirableReplaySubject<T> : ISubject<T>
{
private readonly TimeSpan _window;
private readonly Func<T, TimeSpan> _expireAfterSelector;
private readonly ReplaySubject<ISubject<T>> _replaySubject;
private readonly IObservable<T> _replaySubjectMerged;
public ExpirableReplaySubject(TimeSpan window,
Func<T, TimeSpan> expireAfterSelector)
{
_window = window;
_expireAfterSelector = expireAfterSelector;
_replaySubject = new ReplaySubject<ISubject<T>>(window);
_replaySubjectMerged = _replaySubject.Merge();
}
public void OnNext(T value)
{
var expireAfter = _expireAfterSelector(value);
if (expireAfter > _window) expireAfter = _window;
var inner = new ReplaySubject<T>(1, expireAfter); inner.OnNext(value);
_replaySubject.OnNext(inner);
}
public void OnCompleted()
{
// All subjects, inner and outer, must be completed
_replaySubject.OnCompleted();
_replaySubject.Subscribe(subject => subject.OnCompleted());
}
public void OnError(Exception error)
{
// Faulting the outer subject is enough
_replaySubject.OnError(error);
}
public IDisposable Subscribe(IObserver<T> observer)
{
return _replaySubjectMerged.Subscribe(observer);
}
}
The window
parameter configures the retention timespan of the subject, and the expireAfterSelector
is a selector that runs for each new item received, and returns the expiration of this specific item.
The problem with this approach is that when the inner subjects are evicted from the outer _replaySubject
, according to the window
policy, they are not completed (their OnCompleted
method is not invoked). Without completing the inner subjects they will never be unsubscribed from the _replaySubjectMerged
, resulting to a permanent memory leak. So a bit more work is required. Instead of using ReplaySubject<T>
s with a bufferSize
equal to 1
as inner subjects, we can implement a lightweight ISubject<T>
containing a single expirable value:
private class ExpirableValueSubject<T> : ISubject<T>
{
private T _value;
private Timer _timer; // Becomes null when the subject is completed
public ExpirableValueSubject(T value, TimeSpan expireAfter)
{
if (expireAfter <= TimeSpan.Zero) return; // Expired upon creation
_value = value;
_timer = new Timer(arg => ((ExpirableValueSubject<T>)arg).OnCompleted(),
this, expireAfter, Timeout.InfiniteTimeSpan);
}
public void OnNext(T value) => throw new InvalidOperationException();
public void OnError(Exception error) => throw new InvalidOperationException();
public void OnCompleted()
{
lock (this) { _timer?.Dispose(); _timer = null; _value = default; }
}
public IDisposable Subscribe(IObserver<T> observer)
{
lock (this)
{
if (_timer != null) observer.OnNext(_value);
observer.OnCompleted();
}
return Disposable.Empty;
}
}
This custom subject emits instantly an OnCompleted
notification upon subscription, and its OnCompleted
method has been hijacked for releasing the _value
and disposing the _timer
.
This class uses this
as a locker object, which is not advisable in general. In this case the class is intended as an internal component, and references to instances of this class will not leak to the external world, so locking on this
should be OK.
To complete our solution, all you have to do is replace this line:
var inner = new ReplaySubject<T>(1, expireAfter); inner.OnNext(value);
...with this:
var inner = new ExpirableValueSubject<T>(value, expireAfter);
With this change, the ExpirableReplaySubject<T>
class should work as expected, without leaking memory. It is thread-safe of course, as all ISubject<T>
implementations aimed for general usage should be.
Usage example:
var subject = new ExpirableReplaySubject<Item>(TimeSpan.FromSeconds(30),
item => item.TTL);
Note: The original implementation (Revision 1) used an encapsulated BehaviorSubject<T>
as inner subject. This approach was problematic because a BehaviorSubject<T>
releases its internal value only when it is disposed, not when it is completed. And after disposing it it becomes unusable (it throws ObjectDisposedException
s).