28

How can I clear the buffer on a ReplaySubject?

Periodically I need to clear the buffer (as an end of day event in my case) to prevent the ReplaySubject continually growing and eventually eating all the memory.

Ideally I want to keep the same ReplaySubject as the client subscriptions are still good.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
CodingHero
  • 2,865
  • 6
  • 29
  • 42

3 Answers3

25

ReplaySubject doesn't offer a means to clear the buffer, but there are several overloads to constrain its buffers in different ways:

  • A maximum TimeSpan that items are retained for
  • A maximum item count
  • A combination of the above, which drops items as soon as either condition is met.

A Clearable ReplaySubject

This was quite an interesting problem - I decided to see how easy it would be to implement a variation of ReplaySubject you can clear - using existing subjects and operators (as these are quite robust). Turns out it was reasonably straightforward.

I've run this through a memory profiler to check it does the right thing. Call Clear() to flush the buffer, otherwise it works just like a regular unbounded ReplaySubject:

public class RollingReplaySubject<T> : ISubject<T>
{
    private readonly ReplaySubject<IObservable<T>> _subjects;
    private readonly IObservable<T> _concatenatedSubjects;
    private ISubject<T> _currentSubject;

    public RollingReplaySubject()
    {
        _subjects = new ReplaySubject<IObservable<T>>(1);
        _concatenatedSubjects = _subjects.Concat();
        _currentSubject = new ReplaySubject<T>();
        _subjects.OnNext(_currentSubject);
    }

    public void Clear()
    {
        _currentSubject.OnCompleted();
        _currentSubject = new ReplaySubject<T>();
        _subjects.OnNext(_currentSubject);
    }

    public void OnNext(T value)
    {
        _currentSubject.OnNext(value);
    }

    public void OnError(Exception error)
    {
        _currentSubject.OnError(error);
    }

    public void OnCompleted()
    {
        _currentSubject.OnCompleted();
        _subjects.OnCompleted();     
        // a quick way to make the current ReplaySubject unreachable
        // except to in-flight observers, and not hold up collection
        _currentSubject = new Subject<T>();       
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _concatenatedSubjects.Subscribe(observer);
    }
}

Respect usual rules (as with any Subject) and don't call methods on this class concurrently - including Clear(). You could add synchronization locks trivially if needed.

It works by nesting a sequence of ReplaySubjects inside a master ReplaySubject. The outer ReplaySubject (_subjects) holds a buffer of exactly one inner ReplaySubject (_currentSubject), and it is populated on construction.

The OnXXX methods call through to the _currentSubject ReplaySubject.

Observers are subscribed to a concatenated projection of the nested ReplaySubjects (held in _concatenatedSubjects). Because the buffer size of _subjects is just 1, new subscribers acquire the events of only the most recent ReplaySubject onwards.

Whenever we need to "clear the buffer", the existing _currentSubject is OnCompleted and a new ReplaySubject is added to _subjects and becomes the new _currentSubject.

Enhancements

Following @Brandon's suggestion, I created a version of RollingReplaySubject that uses either a TimeSpan or an input stream to signal buffer clearing. I created a Gist for this here: https://gist.github.com/james-world/c46f09f32e2d4f338b07

Community
  • 1
  • 1
James World
  • 29,019
  • 9
  • 86
  • 120
  • @CodingHero I've added a clearable `ReplayBuffer` implementation. – James World Mar 09 '15 at 18:13
  • 1
    Pretty cool. I'd give you a 2nd upvote if I could. That's the sort of solution I was thinking one would do, only I was thinking its constructor would consume an observable signal for clearing the buffer, similar to the `Buffer` and `Window` family of operators. Very easy to turn that into a new `Replay` overload that cleared the buffer on signal instead of time/size. – Brandon Mar 10 '15 at 16:52
  • Cheers! Good idea about refining to include a reactive signal for clearance. @DaveSexton wrote a good blog post about that as a general observation: http://davesexton.com/blog/post/Reactive-Reactive-Read-all-about-it.aspx – James World Mar 10 '15 at 18:01
  • 3
    I think you have too much time on your hands :) Very nice. If you declare the constructor to take `IObservable` for the signal you can completely eliminate `TBufferClearing` generic parameter (which serves no real purpose) as well as the `Unit` stuff. I suspect the `On*` methods need to be guarded by `gate` also otherwise there's a race condition if the clear signal fires an is clearing just as `On*` is called...notifications can get lost. And it should probably implement `IDisposable` since it holds a subscription to the buffer clearing. – Brandon Mar 11 '15 at 12:36
  • @Brandon I updated the gates while you were writing that.... lol. Thought about IObservable but was worried about boxing/unboxing cost? (Suspect that's why Buffer etc. have the same generic parameter). Will add the IDisposable. – James World Mar 11 '15 at 12:40
  • 1
    @Brandon - I did scrub the `Unit` projections too... just left them as unconverted `long`s – James World Mar 11 '15 at 13:17
  • Thank you @JamesWorld for the great answer!! While porting this to Java I realized that `_subjects = new ReplaySubject>(1);` DOES NOT translate to `_subjects = ReplaySubject.create(1);` but `_subjects = ReplaySubject.createWithSize(1);` .. took me quite some time :D – Philipp Feb 20 '16 at 12:33
0

It is likely that you already have an Observable source of data, in which case, here is another solution. This one uses composition of existing RX constructs rather than building your own ISubject which I'm personally wary of.

public class ClearableReplaySubject<TSource, TClearTrigger> : IConnectableObservable<TSource>
{
    private readonly IConnectableObservable<IObservable<TSource>> _underlying;
    private readonly SerialDisposable _replayConnectDisposable = new SerialDisposable();

    public ClearableReplaySubject(IObservable<TSource> src, IObservable<TClearTrigger> clearTrigger)
    {
        _underlying = clearTrigger.Select(_ => Unit.Default).StartWith(Unit.Default)
            .Select(_ =>
            {
                var underlyingReplay = src.Replay();
                _replayConnectDisposable.Disposable = underlyingReplay.Connect();
                return underlyingReplay;
            })
            .Replay(1);
    }

    public IDisposable Subscribe(IObserver<TSource> observer)
    {
        return _underlying.Switch().Subscribe(observer);
    }

    public IDisposable Connect()
    {
        return new CompositeDisposable(_underlying.Connect(), _replayConnectDisposable.Disposable);
    }
}

If you add the following extension method to your ObservableEx:

public static class ObservableEx
{
    public static IConnectableObservable<TItem> ReplayWithReset<TItem, TReset>(this IObservable<TItem> src, IObservable<TReset> resetTrigger)
    {
        return new ClearableReplaySubject<TItem, TReset>(src, resetTrigger);
    }
}

then you can take your source and add .ReplayWithReset(...) with your reset trigger Observable. This could be a timer or whatever.

var replay = sourceObservable.ReplayWithReset(triggerObservable);
var connection = replay.Connect();

The connect behaves in the same way as a Replay would.

Lee Oades
  • 1,638
  • 17
  • 24
  • I've a load of unit tests here too. I'll upload the whole thing to BitBucket and expand it to include the option of a scheduler. – Lee Oades Mar 31 '15 at 09:09
  • Thanks Angshuman. I've had a quick look. I would suggest using the answer on this page as it looks better than that much older solution. – Lee Oades May 19 '16 at 12:29
0

Well I don't know about c# but I managed to get it done in replay subject rxdart . As for replaysubject it uses queue for caching the events so I modified the replaysubject class.

  1. I changed all the queues to List
  2. Added onRemove method which will remove event from the chached list.

Original ReplaySubject :

class ReplaySubject<T> extends Subject<T> implements ReplayStream<T> {
final Queue<T> _queue;
final int _maxSize;

/// Constructs a [ReplaySubject], optionally pass handlers for
/// [onListen], [onCancel] and a flag to handle events [sync].
///
/// See also [StreamController.broadcast]
factory ReplaySubject({
    int maxSize,
    void Function() onListen,
    void Function() onCancel,
    bool sync = false,
}) {
    // ignore: close_sinks
    final controller = StreamController<T>.broadcast(
        onListen: onListen,
        onCancel: onCancel,
        sync: sync,
    );

    final queue = Queue<T>();

    return ReplaySubject<T>._(
        controller,
        Rx.defer<T>(
                () => controller.stream.startWithMany(queue.toList(growable: false)),
            reusable: true,
        ),
        queue,
        maxSize,
    );
}

ReplaySubject._(
    StreamController<T> controller,
    Stream<T> stream,
    this._queue,
    this._maxSize,
    ) : super(controller, stream);

@override
void onAdd(T event) {
    if (_queue.length == _maxSize) {
        _queue.removeFirst();
    }

    _queue.add(event);
}

@override
List<T> get values => _queue.toList(growable: false);
}

Modified Replay subject :

class ModifiedReplaySubject<T> extends Subject<T> implements ReplayStream<T> {
final List<T> _list;
final int _maxSize;

/// Constructs a [ModifiedReplaySubject], optionally pass handlers for
/// [onListen], [onCancel] and a flag to handle events [sync].
///
/// See also [StreamController.broadcast]
factory ModifiedReplaySubject({
    int maxSize,
    void Function() onListen,
    void Function() onCancel,
    bool sync = false,
}) {
    // ignore: close_sinks
    final controller = StreamController<T>.broadcast(
        onListen: onListen,
        onCancel: onCancel,
        sync: sync,
    );

    final queue = List<T>();

    return ModifiedReplaySubject<T>._(
        controller,
        Rx.defer<T>(
                () => controller.stream.startWithMany(queue.toList(growable: false)),
            reusable: true,
        ),
        queue,
        maxSize,
    );
}

ModifiedReplaySubject._(
    StreamController<T> controller,
    Stream<T> stream,
    this._list,
    this._maxSize,
    ) : super(controller, stream);

@override
void onAdd(T event) {
    if (_list.length == _maxSize) {
        _list.removeAt(0);
    }

    _list.add(event);
}

void onRemove(T event) {
    _list.remove(event);
}

@override
List<T> get values => _list.toList(growable: false);
}
Mukul Pathak
  • 437
  • 6
  • 4