1

I'd like reduce the number of items that go into a log file:

enter image description here

I don't need all the duplicate items, but I want to keep the last item before the change. The items are numbers, and are going to be plotted. I need the last item before the change to keep the shape of the waveform, otherwise it will end up as a triangular wave instead of square wave.

HenningNT
  • 183
  • 8
  • 1
    I don't think that it is possible to produce the requested marble diagram. In order to produce it exactly as it's shown, the operator should have anterior knowledge that the third (A) is the last (A), so that it can emit it immediately. To make the operator feasible, you should probably postpone the emission of the last (A) until the next different element, the first (B), is produced. – Theodor Zoulias May 17 '21 at 06:11
  • 2
    As @TheodorZoulias already mentioned, the closest thing you could get, would be something [like this](https://rx-marbles-online.herokuapp.com/?marblescode=marble+foo_example%0D%0A%7B%0D%0A++++source+a%3A+++++%2B--A-A-A-B-B-B-A-A-A-%7C%0D%0A++++operator+foo%3A+%2B--A-----AB----BA----A%7C%0D%0A%7D%0D%0A&scale=75&theme=default). – Oliver May 17 '21 at 06:19
  • I see, that would be acceptable. I'll update the drawing... – HenningNT May 17 '21 at 06:34
  • I just found out about CombineLatest, it looks quite promising! – HenningNT May 17 '21 at 06:39
  • Just found this interesting [blog post](https://www.zerobugbuild.com/?p=213) that goes back to [this SO answer](https://stackoverflow.com/q/2820685/1838048). So it seems this question is an duplicate. – Oliver May 17 '21 at 07:42
  • HenningNT is it possible to include in the marble diagram the case of an element that is different from both its previous and next elements? Currently it is unclear what should be the behavior of the requested operator in this case. – Theodor Zoulias May 17 '21 at 08:17
  • @TheodorZoulias: Reopened the question. While I didn't ask the question, I would say, that your question can already be answered with the given information. IMHO the marble diagram would in this case look [like this](https://rx-marbles-online.herokuapp.com/?marblescode=marble+foo_example%0D%0A%7B%0D%0A++++source+a%3A+++++%2B-A--A--A--B--C--B--A--A--A-%7C%0D%0A++++operator+foo%3A+%2B-A--------AB-BC-CB-BA----A-%7C%0D%0A%7D%0D%0A&scale=75&theme=default). So whenever current and last differ, emit both. – Oliver May 17 '21 at 09:00
  • Also IMHO this operator should take either a `Func` or `IEqualityComparer` to provide an individual comparison (like an ID property in the given object or case insensitive comparisons, etc.). – Oliver May 17 '21 at 09:01
  • 1
    @Oliver yeap, the built-in Rx [`DistinctUntilChanged`](https://learn.microsoft.com/en-us/previous-versions/dotnet/reactive-extensions/hh229045(v=vs.103)) operator has overloads that accept a comparer or a selector or both. Regarding the behavior of single elements, you are probably right, but I would like a confirmation from the OP before attempting to answer the question. – Theodor Zoulias May 17 '21 at 09:35

3 Answers3

1

My approach would be similar to @Olivers comment above: 'So whenever current and last differ, emit both'.

private static IObservable<T> NullableDistinctWithChange<T>(IObservable<T> source)
    where T : struct
{
    return source
        .Scan(
            (Value: default(T?), Result: Array.Empty<T>()),
            (last, current) => last.Value switch
            {
                null => (Value: current, Result: new T[] { current }),
                T x when x.Equals(current) => (Value: current, Result: Array.Empty<T>()),
                T x => (Value: current, Result: new[] { x, current })
            })
        .SelectMany(tuple => tuple.Result);
}

private static IObservable<T> DistinctWithChange<T>(IObservable<T> source)
    where T : class
{
    return source
        .Scan(
            (Value: default(T), Result: Array.Empty<T>()),
            (last, current) => last.Value switch
            {
                null => (Value: current, Result: new T[] { current }),
                T x when x.Equals(current) => (Value: current, Result: Array.Empty<T>()),
                T x => (Value: current, Result: new[] { x, current })
            })
        .SelectMany(tuple => tuple.Result);
}


private static readonly Recorded<Notification<string>>[] Xs = new Recorded<Notification<string>>[]
{
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(5).Ticks, Notification.CreateOnNext("A")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(15).Ticks, Notification.CreateOnNext("A")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(65).Ticks, Notification.CreateOnNext("A")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(95).Ticks, Notification.CreateOnNext("A")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(125).Ticks, Notification.CreateOnNext("B")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(155).Ticks, Notification.CreateOnNext("B")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(185).Ticks, Notification.CreateOnNext("B")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(195).Ticks, Notification.CreateOnNext("C")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(205).Ticks, Notification.CreateOnNext("B")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(215).Ticks, Notification.CreateOnNext("B")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(225).Ticks, Notification.CreateOnNext("B")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(235).Ticks, Notification.CreateOnNext("B")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(245).Ticks, Notification.CreateOnNext("A")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(250).Ticks, Notification.CreateOnNext("A")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(255).Ticks, Notification.CreateOnNext("A")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(260).Ticks, Notification.CreateOnNext("A")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(265).Ticks, Notification.CreateOnNext("A")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(270).Ticks, Notification.CreateOnNext("A")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(280).Ticks, Notification.CreateOnCompleted<string>())
};

private static readonly Recorded<Notification<string>>[] Expected = new Recorded<Notification<string>>[]
{
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(5).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("A")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(125).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("A")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(125).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("B")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(195).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("B")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(195).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("C")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(205).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("C")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(205).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("B")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(245).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("B")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(245).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("A")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(280).Ticks + ReactiveTest.Subscribed, Notification.CreateOnCompleted<string>())
};

[Test]
public void ShouldPerformDistinctWithChange()
{
    var scheduler = new TestScheduler();

    var xs = scheduler.CreateColdObservable(Xs);

    var observed = scheduler.Start(() => DistinctWithChange(xs), TimeSpan.FromSeconds(300).Ticks);

    Assert.That(observed.Messages, Is.EqualTo(Expected));
}

I included a NullableDistinctWithChange version as the OP suggested you wanted this solution to work with numbers.

Both functions could be unified and improved with an Option<T> type but I didn't want to overcomplicate the answer.

Furthermore it would be trivial to prevent single-item duplicates (by introducing a count value into the Scan tuple), and to use an IEqualityComparer to delegate the predication of change.

Dharman
  • 30,962
  • 25
  • 85
  • 135
ibebbs
  • 1,963
  • 2
  • 13
  • 20
1

Here's a query that does what you want:

IObservable<string> query =
    subject
        .Publish(ss =>
            Observable
                .Concat(
                    ss.Take(1),
                    ss.
                        DistinctUntilChanged()
                            .Publish(dss => dss.Zip(dss.Skip(1), (m, n) => (m, n)))
                            .SelectMany(z => new [] { z.m, z.n })));

The Publish operators ensure that there's only one subscription to the original source.

With this test:

subject.OnNext("A");
subject.OnNext("A");
subject.OnNext("A");
subject.OnNext("B");
subject.OnNext("B");
subject.OnNext("B");
subject.OnNext("A");
subject.OnNext("A");
subject.OnNext("A");
subject.OnCompleted();

I get these values:

A 
A 
B 
B 
A 
Enigmativity
  • 113,464
  • 11
  • 89
  • 172
0

I've found a solution that meets the (rather limited) requirements:

    var subject = new Subject<string>();
    var distinct = subject.DistinctUntilChanged();

    var combinedLatesDistinct = Observable.CombineLatest(distinct, subject, Selector).DistinctUntilChanged();

    Observable.Merge(distinct, combinedLatesDistinct).Subscribe(i => Console.WriteLine($"Result: {i}"));

    subject.OnNext("A");
    subject.OnNext("A");
    subject.OnNext("A");
    subject.OnNext("A");
    subject.OnNext("B");
    subject.OnNext("B");
    subject.OnNext("B");
    subject.OnNext("C");
    subject.OnNext("B");
    subject.OnNext("B");
    subject.OnNext("B");
    subject.OnNext("B");
    subject.OnNext("A");
    subject.OnNext("A");
    subject.OnNext("A");
    subject.OnNext("A");
    subject.OnNext("A");
    subject.OnNext("A");
    subject.OnCompleted();

    Console.ReadLine();

The result is:

Result: A
Result: A
Result: B
Result: B
Result: C
Result: C
Result: B
Result: B
Result: A
Result: A

Items occurring only once will be duplicated, but this is OK for my use.

HenningNT
  • 183
  • 8