Well I don't know about c# but I managed to get it done in replay subject rxdart .
As for replaysubject it uses queue for caching the events so I
modified the replaysubject class.
- I changed all the queues
to List
- Added onRemove method which will remove event from
the chached list.
Original ReplaySubject :
class ReplaySubject<T> extends Subject<T> implements ReplayStream<T> {
final Queue<T> _queue;
final int _maxSize;
/// Constructs a [ReplaySubject], optionally pass handlers for
/// [onListen], [onCancel] and a flag to handle events [sync].
///
/// See also [StreamController.broadcast]
factory ReplaySubject({
int maxSize,
void Function() onListen,
void Function() onCancel,
bool sync = false,
}) {
// ignore: close_sinks
final controller = StreamController<T>.broadcast(
onListen: onListen,
onCancel: onCancel,
sync: sync,
);
final queue = Queue<T>();
return ReplaySubject<T>._(
controller,
Rx.defer<T>(
() => controller.stream.startWithMany(queue.toList(growable: false)),
reusable: true,
),
queue,
maxSize,
);
}
ReplaySubject._(
StreamController<T> controller,
Stream<T> stream,
this._queue,
this._maxSize,
) : super(controller, stream);
@override
void onAdd(T event) {
if (_queue.length == _maxSize) {
_queue.removeFirst();
}
_queue.add(event);
}
@override
List<T> get values => _queue.toList(growable: false);
}
Modified Replay subject :
class ModifiedReplaySubject<T> extends Subject<T> implements ReplayStream<T> {
final List<T> _list;
final int _maxSize;
/// Constructs a [ModifiedReplaySubject], optionally pass handlers for
/// [onListen], [onCancel] and a flag to handle events [sync].
///
/// See also [StreamController.broadcast]
factory ModifiedReplaySubject({
int maxSize,
void Function() onListen,
void Function() onCancel,
bool sync = false,
}) {
// ignore: close_sinks
final controller = StreamController<T>.broadcast(
onListen: onListen,
onCancel: onCancel,
sync: sync,
);
final queue = List<T>();
return ModifiedReplaySubject<T>._(
controller,
Rx.defer<T>(
() => controller.stream.startWithMany(queue.toList(growable: false)),
reusable: true,
),
queue,
maxSize,
);
}
ModifiedReplaySubject._(
StreamController<T> controller,
Stream<T> stream,
this._list,
this._maxSize,
) : super(controller, stream);
@override
void onAdd(T event) {
if (_list.length == _maxSize) {
_list.removeAt(0);
}
_list.add(event);
}
void onRemove(T event) {
_list.remove(event);
}
@override
List<T> get values => _list.toList(growable: false);
}