1

What's an elegant way of composing an Rx observable which would resemble ReplaySubject, but only emit the accumulated sequence once and for the first subscriber only (when that subscriber is connected)? After the 1st subscription, it should act just as a regular Subject.

This for a .NET project, but I'd equally appreciate JavaScript/RxJS answers.

I did google for potential solutions, and I'm about to roll out my own, similar to how I approached DistinctSubject, eventually.

noseratio
  • 59,932
  • 34
  • 208
  • 486
  • What's the use-case here? – Enigmativity Sep 30 '21 at 02:06
  • @Enigmativity, I have a case where I buffer some events, and multiple consumers can be racing for the first batch, as they start observing. So it's like ReplaySubject, but replaying only to the one which wins the race. That's probably the best way I could describe it. I think I could solve it with bare TPL, but I'm interested in Rx solution from the learning perspective as well. – noseratio Sep 30 '21 at 02:38
  • 1
    I have asked something similar a few months ago: [How to make a lightweight `Replay` operator that can be subscribed only once?](https://stackoverflow.com/questions/65494884/how-to-make-a-lightweight-replay-operator-that-can-be-subscribed-only-once) – Theodor Zoulias Sep 30 '21 at 06:55
  • @TheodorZoulias, tks, makes me think, maybe this scenario isn't so exotic :) In my case, I also need to make sure other subscribers will be receiving the fresh new events, but not before the 1st subscribed has observed all of the initial buffer (and then no more buffering). I'll get to it, I have a TPL solution in place so I'm not in a hurry. – noseratio Sep 30 '21 at 08:10

1 Answers1

1

I modified slightly the implementation found in a similar question, and changed the name of the class from ReplayOnceSubject to ReplayFirstSubscriberOnlySubject:

public class ReplayFirstSubscriberOnlySubject<T> : ISubject<T>
{
    private readonly object _locker = new object();
    private ISubject<T> _subject = new ReplaySubject<T>();

    public void OnNext(T value) { lock (_locker) _subject.OnNext(value); }
    public void OnError(Exception error) { lock (_locker) _subject.OnError(error); }
    public void OnCompleted() { lock (_locker) _subject.OnCompleted(); }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        if (observer == null) throw new ArgumentNullException(nameof(observer));
        lock (_locker)
        {
            if (_subject is ReplaySubject<T> replaySubject)
            {
                var subject = new Subject<T>();
                var subscription = subject.Subscribe(observer);
                // Now replay the buffered notifications
                replaySubject.Subscribe(subject).Dispose();
                replaySubject.Dispose();
                _subject = subject;
                return subscription;
            }
            else
                return _subject.Subscribe(observer);
        }
    }
}

This is probably not the most efficient solution, since two different locks are acquired on every operation (the _locker and the internal _gate), but it shouldn't be very bad either.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 1
    It's good! The only thing is, the lock will block event producers (those calling `OnNext`), while the buffer is played back to the 1st subscribers. Then again, that's how [`ReplaySubject` behaves](https://github.com/dotnet/reactive/blob/1ff45fbb832cf722dee875a83886dca25a923be3/Rx.NET/Source/src/System.Reactive/Subjects/ReplaySubject.cs#L350) as well. Thanks! – noseratio Sep 30 '21 at 11:04
  • 1
    @noseratio when I implemented the `ReplayOnceSubject` I started with a lock-less implementation (`Interlocked.CompareExchange`). But things weren't going smooth, I don't remember the reason, and so I finally settled for the trusty `lock`. – Theodor Zoulias Sep 30 '21 at 11:34
  • 1
    BTW do you know using `lock { ... }` can lead to nasty reentrancy issues on STA threads (WinForms, WPF etc). I used to have a mental picture that `Monitor.Enter/Exit` are wrappers for Win32 Critical Sections, but it turns they aren't: https://github.com/mono/SkiaSharp/issues/1383#issuecomment-927750884 – noseratio Sep 30 '21 at 12:03
  • 1
    @noseratio I was not aware about the specific STA nuances, but I am very aware that if I `Cancel` a `CancellationTokenSource` or `TrySetResult` a `TaskCompletionSource` inside a `lock` protected region, it's likely that all hell will break loose. – Theodor Zoulias Sep 30 '21 at 12:22
  • 1
    @noseratio btw I implemented hastily the `IDisposable` interface, without checking what happens if the `Subscribe` is invoked on a disposed subject. You may want to take a look at it, if you are planning to put this class in production. – Theodor Zoulias Sep 30 '21 at 12:28
  • 1
    @noseratio I just remembered what was the problem with omitting the lock. Without locking there was no guarantee that the first subscriber will receive all the buffered notifications before receiving any future notification. Which was quite a big problem. – Theodor Zoulias Sep 30 '21 at 15:16
  • 1
    Yep exactly, and we also don't want new subscribers to receive anything before that. So while playback is happening, the producer of new events is blocked. That might be not too exciting but that's how the original `ReplaySubject` works too. – noseratio Sep 30 '21 at 20:43
  • 1
    Hi @noseratio. To keep the answer simple, I removed the half-baked implementation of the `IDisposable` interface. Also I've uploaded [here](https://gist.github.com/theodorzoulias/06d39354ab211d27ecdc65f91ec943b7) a `ReplayLimitedSubscriptionsSubject` custom subject that discards the replay buffer after a specified number of subscriptions, instead of after the first subscription. It's not thoroughly tested. I wrote it with the intention to solve another problem, but the solution didn't go well for other reasons (probably). – Theodor Zoulias Oct 01 '21 at 08:51
  • Theo, a bit off topic, I wonder if with your Rx background you ever you stumbled upon a proper .NET implementation of what is [`throttle` in RxJS](https://rxjs.dev/api/operators/throttle)? There's a bit of terminology confusion here, I believe .NET `Throttle` is what is [`debounce`](https://rxjs.dev/api/operators/debounce) in RxJS and other implementations, while actual throttle operator is missing in Rx .NET (the closest to it is probably `Sample`). – noseratio Oct 02 '21 at 04:27
  • 1
    @noseratio I think we've discussed this [here](https://stackoverflow.com/questions/21400514/how-to-throttle-the-speed-of-an-event-without-using-rx-framework/65553763#65553763). I haven't learned anything new about the throttle/debounce family of operators since then. :-) – Theodor Zoulias Oct 02 '21 at 05:24
  • 1
    OMG, I had a dejavu feeling but didn't even bother to search ‍♂️Thank you! Pretty sure I'll ask one more time when I actually need it, so I apologies in advance – noseratio Oct 02 '21 at 05:30