I modified slightly the implementation found in a similar question, and changed the name of the class from ReplayOnceSubject
to ReplayFirstSubscriberOnlySubject
:
public class ReplayFirstSubscriberOnlySubject<T> : ISubject<T>
{
private readonly object _locker = new object();
private ISubject<T> _subject = new ReplaySubject<T>();
public void OnNext(T value) { lock (_locker) _subject.OnNext(value); }
public void OnError(Exception error) { lock (_locker) _subject.OnError(error); }
public void OnCompleted() { lock (_locker) _subject.OnCompleted(); }
public IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null) throw new ArgumentNullException(nameof(observer));
lock (_locker)
{
if (_subject is ReplaySubject<T> replaySubject)
{
var subject = new Subject<T>();
var subscription = subject.Subscribe(observer);
// Now replay the buffered notifications
replaySubject.Subscribe(subject).Dispose();
replaySubject.Dispose();
_subject = subject;
return subscription;
}
else
return _subject.Subscribe(observer);
}
}
}
This is probably not the most efficient solution, since two different lock
s are acquired on every operation (the _locker
and the internal _gate
), but it
shouldn't be very bad either.