My approach would be similar to @Olivers comment above: 'So whenever current and last differ, emit both'.
private static IObservable<T> NullableDistinctWithChange<T>(IObservable<T> source)
where T : struct
{
return source
.Scan(
(Value: default(T?), Result: Array.Empty<T>()),
(last, current) => last.Value switch
{
null => (Value: current, Result: new T[] { current }),
T x when x.Equals(current) => (Value: current, Result: Array.Empty<T>()),
T x => (Value: current, Result: new[] { x, current })
})
.SelectMany(tuple => tuple.Result);
}
private static IObservable<T> DistinctWithChange<T>(IObservable<T> source)
where T : class
{
return source
.Scan(
(Value: default(T), Result: Array.Empty<T>()),
(last, current) => last.Value switch
{
null => (Value: current, Result: new T[] { current }),
T x when x.Equals(current) => (Value: current, Result: Array.Empty<T>()),
T x => (Value: current, Result: new[] { x, current })
})
.SelectMany(tuple => tuple.Result);
}
private static readonly Recorded<Notification<string>>[] Xs = new Recorded<Notification<string>>[]
{
new Recorded<Notification<string>>(TimeSpan.FromSeconds(5).Ticks, Notification.CreateOnNext("A")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(15).Ticks, Notification.CreateOnNext("A")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(65).Ticks, Notification.CreateOnNext("A")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(95).Ticks, Notification.CreateOnNext("A")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(125).Ticks, Notification.CreateOnNext("B")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(155).Ticks, Notification.CreateOnNext("B")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(185).Ticks, Notification.CreateOnNext("B")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(195).Ticks, Notification.CreateOnNext("C")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(205).Ticks, Notification.CreateOnNext("B")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(215).Ticks, Notification.CreateOnNext("B")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(225).Ticks, Notification.CreateOnNext("B")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(235).Ticks, Notification.CreateOnNext("B")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(245).Ticks, Notification.CreateOnNext("A")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(250).Ticks, Notification.CreateOnNext("A")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(255).Ticks, Notification.CreateOnNext("A")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(260).Ticks, Notification.CreateOnNext("A")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(265).Ticks, Notification.CreateOnNext("A")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(270).Ticks, Notification.CreateOnNext("A")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(280).Ticks, Notification.CreateOnCompleted<string>())
};
private static readonly Recorded<Notification<string>>[] Expected = new Recorded<Notification<string>>[]
{
new Recorded<Notification<string>>(TimeSpan.FromSeconds(5).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("A")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(125).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("A")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(125).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("B")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(195).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("B")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(195).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("C")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(205).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("C")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(205).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("B")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(245).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("B")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(245).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("A")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(280).Ticks + ReactiveTest.Subscribed, Notification.CreateOnCompleted<string>())
};
[Test]
public void ShouldPerformDistinctWithChange()
{
var scheduler = new TestScheduler();
var xs = scheduler.CreateColdObservable(Xs);
var observed = scheduler.Start(() => DistinctWithChange(xs), TimeSpan.FromSeconds(300).Ticks);
Assert.That(observed.Messages, Is.EqualTo(Expected));
}
I included a NullableDistinctWithChange version as the OP suggested you wanted this solution to work with numbers.
Both functions could be unified and improved with an Option<T>
type but I didn't want to overcomplicate the answer.
Furthermore it would be trivial to prevent single-item duplicates (by introducing a count value into the Scan tuple), and to use an IEqualityComparer to delegate the predication of change.