4

Is there a Subject implementation in Rx.NET that functionally resembles BehaviorSubject but emits the next value only if it has changed?

I'm rather new to Reactive Extensions and I can't seem to find anything like that, although this pattern feels like a natural replacement for INotifyPropertyChanged.

My naive implementation is to encapsulate BehaviorSubject<T> like below. Is there any disadvantages in this, compared to creating a composable observable with Observable.DistinctUntilChanged?

    public class DistinctSubject<T> : SubjectBase<T>
    {
        private readonly BehaviorSubject<T> _subject;

        public DistinctSubject(T initialValue) =>
            _subject = new BehaviorSubject<T>(initialValue);

        public T Value 
        { 
            get => _subject.Value;
            set => this.OnNext(value);
        }

        public override bool HasObservers => _subject.HasObservers;

        public override bool IsDisposed => _subject.IsDisposed;

        public override void Dispose() => _subject.Dispose(); 

        public override void OnCompleted() => _subject.OnCompleted();   

        public override void OnError(Exception error) => _subject.OnError(error);

        public override void OnNext(T value)
        {
            if (!EqualityComparer<T>.Default.Equals(value, _subject.Value))
            {
                _subject.OnNext(value);
            }
        }

        public override IDisposable Subscribe(IObserver<T> observer) =>
            _subject.Subscribe(observer);
    }
noseratio
  • 59,932
  • 34
  • 208
  • 486
  • Why do you want this? – Shlomo Sep 18 '21 at 23:39
  • @Shlomo, to replace POCO properties + `INotifyPropertyChanged` with something like `DistinctSubject`. – noseratio Sep 19 '21 at 00:06
  • 1
    Don't implement your own `IObservable`, `IObserver`, or `ISubject` objects. You are likely to get them wrong and cause yourself grief. You never want to expose the observer part of a subject to the outside world. That's why there's an `AsObservable()` operator. If you let external code call `OnCompleted` your observable dies. – Enigmativity Sep 19 '21 at 00:20
  • Won't `subject.DistinctUntilChanged()` work for you? – Enigmativity Sep 19 '21 at 00:23
  • @Enigmativity - tks, I'm not trying to implement those, rather just forwarding to `BehaviorSubject`. I think `DistinctUntilChanged` would work, but I'd need to wrap it with something like @TheodorZoulias has [suggested](https://stackoverflow.com/a/69239301/1768303). I'm replacing normal properties with observables and I need a reusable class for that. – noseratio Sep 19 '21 at 00:36
  • 1
    @Enigmativity, from [the thread](https://stackoverflow.com/a/14406666/1768303) you pointed me to: *"Subjects are the stateful components of Rx. They are useful for when you need to create an event-like observable as a field or a local variable"*. I think this is exactly what I'm trying to do here, a basic observable store for my app's main state. In a way, it's similar to what React folks do with [Recoil's atoms](https://recoiljs.org/docs/basic-tutorial/atoms/). – noseratio Sep 19 '21 at 00:44
  • 1
    @noseratio - They are useful for that, but just because they are useful for something doesn't negate the warning about not using them. You need to be cognisant about what the pitfalls are. – Enigmativity Sep 19 '21 at 00:48
  • 1
    @noseratio - Theo's implementation is pretty good, but it should also implement `IDisposable` as an internal field also does. – Enigmativity Sep 19 '21 at 00:54
  • @Enigmativity when professionals use dangerous tools consciously for their intended purpose, reminding them that the dangerous tools are dangerous becomes a bit if a redundancy IMHO. These warnings serve a purpose when kids play with the knife of the Chef, not when the Chef uses the knife themselves! – Theodor Zoulias Sep 22 '21 at 07:50
  • @TheodorZoulias - When professionals use tools new to them they need to be informed. – Enigmativity Sep 22 '21 at 08:10
  • 1
    @noseratio - Please don't edit questions to show answers. Add your own answer. – Enigmativity Sep 30 '21 at 01:49
  • @Enigmativity, un-done. Maybe you could help me with [this one](https://stackoverflow.com/q/69385181/1768303) - tks. – noseratio Sep 30 '21 at 01:55

1 Answers1

3

After glancing a bit at the source code of the BehaviorSubject<T> class, it seems that your DistinctSubject<T> implementation will behave differently in case an OnError is followed by an OnNext:

var subject = new DistinctSubject<int>(2021);
subject.OnError(new ApplicationException());
subject.OnNext(2022); // throws ApplicationException

This will throw, while doing the same with the BehaviorSubject<T> will not throw (the OnNext is just ignored).

My suggestion is to use the DistinctUntilChanged operator in the implementation, like this:

public class DistinctSubject<T> : ISubject<T>, IDisposable
{
    private readonly BehaviorSubject<T> _subject;
    private readonly IObservable<T> _distinctUntilChanged;

    public DistinctSubject(T initialValue, IEqualityComparer<T> comparer = default)
    {
        _subject = new BehaviorSubject<T>(initialValue);
        _distinctUntilChanged = _subject.DistinctUntilChanged(
            comparer ?? EqualityComparer<T>.Default);
    }

    public T Value => _subject.Value;
    public void OnNext(T value) => _subject.OnNext(value);
    public void OnError(Exception error) => _subject.OnError(error);
    public void OnCompleted() => _subject.OnCompleted();

    public IDisposable Subscribe(IObserver<T> observer) =>
        _distinctUntilChanged.Subscribe(observer);

    public void Dispose() => _subject.Dispose();
}

If you are worried about the needless allocation of an object, then you are not already familiar with the spirit of Rx. This library is about features and ease of use, not about performance or efficiency!

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 1
    Thanks much Theodor! Let me absorb this and maybe run through a debugger :) I'm taking a chance to use Rx for a new project, to implement a central observable state store for WinForms UI views. I find Rx amazingly useful, it allowed to me cut a lot of boilerplate state management code. As it's a desktop app and I'm not worried about extra allocations at all :) I considered [ReactiveUI](https://reactiveui.net/) as well but decided it'd be an overkill for this particular project. – noseratio Sep 19 '21 at 00:23
  • 2
    One other thing, I think my forwarding implementation has a race condition while comparing old and new values inside `OnNext(T value)`, while `DistinctUntilChanged` probably doesn't. This isn't an issue for me right now because it all takes place on the main thread, but it's another pro point for `DistinctUntilChanged`. – noseratio Sep 19 '21 at 00:50
  • 3
    Nice implementation, but one small thing - since `BehaviorSubject` implements `IDisposable` then so too should this class. – Enigmativity Sep 19 '21 at 00:53
  • 1
    @Enigmativity fixed! – Theodor Zoulias Sep 19 '21 at 01:08
  • 1
    @noseratio most Rx operators do not have internal synchronization, and instead they rely on the source observable respecting the [Rx contract](http://reactivex.io/documentation/contract.html). The [`DistinctUntilChanged`](https://github.com/dotnet/reactive/blob/main/Rx.NET/Source/src/System.Reactive/Linq/Observable/DistinctUntilChanged.cs) is one of these simple operators that don't need to use locks. So I don't think that your implementation is any different regarding thread-safety than the `DistinctUntilChanged`-based implementation. – Theodor Zoulias Sep 19 '21 at 01:27
  • 1
    @noseratio on closer inspection, yes, there is a race condition in the question's implementation. If the `OnNext` is invoked concurrently on two different threads at the same time, with the same value, it is possible that this value will be emitted twice by the `_subject` to its observers, and the "distinct" guarantee will be violated. With the `DistinctUntilChanged`-based implementation, the `_subject` will send the identical values to the `DistinctUntilChanged` operator one at a time (using a `lock`), so the duplicate value will be emitted only once. – Theodor Zoulias Sep 19 '21 at 01:44
  • 1
    @Enigmativity in retrospect the `BehaviorSubject` does not use the `IDisposable` for its intended purpose (releasing unmanaged resources), so disposing it is not mandatory IMHO. The designers of the `BehaviorSubject` class decided than unsubscribing forcefully all subscribers and making the class unusable was a must-have functionality, and so critical that it should be done in a `finally` block. So they implemented the `IDisposable` to enable the `using` statement. Based on my small experience with Rx, that was probably a questionable decision. I've never disposed any subject so far! – Theodor Zoulias Sep 19 '21 at 08:49
  • @TheodorZoulias, I went ahead and implemented it based on the current Rx's `BehaviorSubject` source, just to mimic its behavior as close as possible (including thread safety). FWIW, [gist](https://gist.github.com/noseratio/79268e55fb441d92ec28cffb2bbe72b8). I need to add tests, but so far I just plugged it in as a replacement for the code in my q, and it works well so far. – noseratio Sep 19 '21 at 09:18
  • 1
    @noseratio ha ha! I was sure that you would *think* about doing it, but actually doing it is borderline insane! You really want to avoid this pesky allocation. – Theodor Zoulias Sep 19 '21 at 09:20
  • 1
    @TheodorZoulias - The whole reactive framework doesn't use `IDisposable` for its intended purpose, but it does shutdown the observable pipeline which could have unmanaged resources. So, at the end of the day, disposing subjects could well be important. You should always dispose all disposables - there are very few exceptions. – Enigmativity Sep 19 '21 at 09:56
  • 2
    Yeah I thought to myself, I'm gonna replace most of the app's state properties with observables, what do I have to lose anyway! But then again, it helps to understand the edge cases, like subscribing to a subject that has already stopped or ended with an error. – noseratio Sep 19 '21 at 10:26
  • @Enigmativity the more that the `IDisposable` is used outside of its intended purpose (which is to release *unmanaged* resources), the more exceptions to the rule "always dispose your disposables" are created. And that's why the `IObservable.Subscribe` method should not hijack the `IDisposable`: to avoid dulling the rule. It could return an `ISubscription` with an `Unsubscribe` method instead. That said, in [this](https://stackoverflow.com/questions/2431488/when-should-i-implement-idisposable) question the `IDisposable` is suggested as a way to cleanup event subscriptions. Lots of opinions. – Theodor Zoulias Sep 19 '21 at 10:38
  • 2
    @TheodorZoulias IMHO these rules have become much more relaxed these days Lots of folks (me included) use `IDisposable` for scoping with `using`, much like the `lock` pattern in C#. Often it's baked in .NET itself, e.g. [`TransactionScope`](https://learn.microsoft.com/en-us/dotnet/api/system.transactions.transactionscope?view=net-5.0). – noseratio Sep 19 '21 at 11:13
  • 1
    @noseratio ha ha! A rule that has been overly relaxed, is not a rule any more. It may have become "best practice", "guideline", "recommendation", "suggestion", or may have just evaporated into nothingness. – Theodor Zoulias Sep 19 '21 at 11:27
  • 1
    Btw let's don't forget that the Rx itself breaks the rule about disposing the disposables, by [letting undisposed all the `CancellationTokenSource`s](https://stackoverflow.com/questions/66382339/does-the-rx-library-omits-disposing-of-the-cancellationtokensources-it-creates) it creates internally! – Theodor Zoulias Sep 19 '21 at 11:47
  • 2
    @noseratio since you got into the trouble of implementing the `DistinctSubject` from scratch, you may as well remove all the thread-synchronization features (the `_lock` field), to avoid slowing down your single-threaded application. Having to take the `lock` on each and every operation is pure overhead, when a single thread does all the work. – Theodor Zoulias Sep 19 '21 at 12:09
  • 1
    I might indeed, especially given that it'd take a lot more efforts to cover all these potential multi-threaded race conditions with unit tests than to code the API itself :) But I also recall that `lock` is really cheap (and reentrant), as long as it's the same thread all along: https://stackoverflow.com/q/4673618/1768303 – noseratio Sep 19 '21 at 12:58
  • @Enigmativity the funny thing is that not even Rx itself disposes the subjects it creates. You can see in the [source code](https://github.com/dotnet/reactive/blob/1ff45fbb832cf722dee875a83886dca25a923be3/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Binding.cs#L40) of the `Publish` operator, that a `new BehaviorSubject` is created and passed as an `ISubject` to the internal [`ConnectableObservable`](https://github.com/dotnet/reactive/blob/main/Rx.NET/Source/src/System.Reactive/Subjects/ConnectableObservable.cs) class, where it's not disposed. – Theodor Zoulias Oct 01 '21 at 08:30
  • @TheodorZoulias - Sounds like a bug to me. – Enigmativity Oct 01 '21 at 09:46
  • @Enigmativity maybe the devs didn't know when it is safe to dispose it. I am guessing that the rationale was: *if they* (us) *want to dispose their subjects, let them create the subject manually, use the `Multicast` operator instead of the `Publish`/`PublishLast`/`Replay`, and dispose it when they are done*. – Theodor Zoulias Oct 01 '21 at 12:29