0

I need an Rx.NET observable that accumulates items while there is no active subscribers, and emits the whole accumulated sequence (and any future items) to new subscribes, as soon as there is any.

It is different from ReplaySubject in that it doesn't keep the items which have been once replayed to any subscriber. Thus, once a queued item has been observed by the current subscribers, it gets removed from the queue and won't be seen by any new future subscribers.

Can something like that be composed using standard Rx.NET operators?

I need it to tackle races conditions in the following scenario. There is a looping async workflow RunWorkflowAsync which needs to perform ResetAsync task when it observes a specific ResetRequestedEvent message.

Here's the whole thing as a .NET 6 console app:

using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Channels;

try
{
    await TestAsync();
}
catch (Exception ex)
{
    Console.WriteLine(ex.Message);
}

async Task TestAsync()
{
    var resetRequestsSubject = new Subject<ResetRequestedEvent>();

    using var cts = new CancellationTokenSource(20000);
    await Task.WhenAll(
        SimulateResetRequests(cts.Token), 
        RunWorkflowAsync(resetRequestsSubject, cts.Token));

    // simulate emmiting reset requests
    async Task SimulateResetRequests(CancellationToken cancelToken)
    {
        async Task Raise(int n, int delay)
        {
            var ev = new ResetRequestedEvent(n);
            Console.WriteLine($"{ev} issued");
            resetRequestsSubject!.OnNext(ev);
            await Task.Delay(delay, cancelToken);
        }

        await Raise(1, 50);
        await Raise(2, 50);
        await Raise(3, 50);
        await Raise(4, 1000);
        await Raise(5, 5000);
        await Raise(6, 4000);
        await Raise(7, 3000);
        resetRequestsSubject.OnCompleted();
    }

    // simulate the reset task
    async Task ResetAsync(CancellationToken cancelToken)
    {
        await Task.Delay(1000, cancelToken);
        Console.WriteLine("Reset done");
    }

    // simulate the work task
    async Task DoWorkAsync(CancellationToken cancelToken)
    {
        await Task.Delay(2000, cancelToken);
        Console.WriteLine("Work done");
    }


    // do reset, then work in a loop until cancelled
    async Task RunWorkflowAsync(IObservable<ResetRequestedEvent> resetRequests, CancellationToken externalCancelToken)
    {
        // from this point, make sure reset requests don't go unobserved
        var accumulatedResetRequests = resetRequests.Accumulate(externalCancelToken);
        using var auto1 = accumulatedResetRequests.Connect();

        while (true)
        {
            externalCancelToken.ThrowIfCancellationRequested(); // stops the whole workflow

            using var internalCts = CancellationTokenSource.CreateLinkedTokenSource(externalCancelToken);
            var internalCancelToken = internalCts.Token;

            // signal cancellation upon the most recent reset request
            using var auto2 = accumulatedResetRequests
                .Do(ev => Console.WriteLine($"{ev} seen"))
                .Throttle(TimeSpan.FromMilliseconds(100))
                .Do(ev => Console.WriteLine($"{ev} acted upon"))
                .Subscribe(_ => internalCts.Cancel());

            try
            {
                // start with a reset
                await ResetAsync(internalCancelToken);
                
                // do work until another reset is requested
                while (true)
                {
                    await DoWorkAsync(internalCancelToken);
                }
            }
            catch (OperationCanceledException)
            {
            }
        }
    }
}

record ResetRequestedEvent(int Number);

public static class RxExt
{
    class CumulativeObservable<T> : IConnectableObservable<T>
    {
        readonly IObservable<T> _source;
        readonly Channel<T> _channel;
        readonly CancellationToken _cancelToken;

        public CumulativeObservable(IObservable<T> source, CancellationToken cancellationToken)
        {
            _source = source;
            _channel = Channel.CreateUnbounded<T>();
            _cancelToken = cancellationToken;
        }

        public IDisposable Connect() =>
            _source.Subscribe(
                onNext: item => _channel.Writer.TryWrite(item),
                onError: ex => _channel.Writer.Complete(ex),
                onCompleted: () => _channel.Writer.Complete());

        public IDisposable Subscribe(IObserver<T> observer) =>
            _channel.Reader.ReadAllAsync(_cancelToken).ToObservable().Subscribe(observer);
    }
    public static IConnectableObservable<T> Accumulate<T>(
        this IObservable<T> @this, 
        CancellationToken cancellationToken) => 
        new CumulativeObservable<T>(@this, cancellationToken);
}

The idea is to stop all pending tasks inside RunWorkflowAsync and perform ResetAsync when ResetRequestedEvent message comes along.

I realize there's more than one way to cook an egg (and implement RunWorkflowAsync), but I like this approach as I don't need to think about thread safety when I use and recycle internalCts cancellation token source (to stop all pending task before another iteration).

Above, CumulativeObservable does what I want, but it's a very naive implementation which only supports one concurrent observable (unlike, say, ReplaySubject) and lacks any safety checks.

I'd prefer a composition that can be built using standard operators.

noseratio
  • 59,932
  • 34
  • 208
  • 486
  • 1
    https://stackoverflow.com/a/24805489/161739 for ideas perhaps, however I would never go as far as implementing IObservable, standard operators wit synchronized Subjects which obey the RX contracts can model every case out there – Apostolis Bekiaris May 23 '22 at 03:49
  • 1
    values that never been observed emmited from this operator https://stackoverflow.com/questions/60483181/observe-values-not-seen-in-other-observers – Apostolis Bekiaris May 23 '22 at 03:52
  • 1
    *"Thus, once a queued item has been observed by the current subscribers, it gets removed from the queue and won't be seen by any new future subscribers."* -- If I understand correctly the queue will only be used when there are zero subscribers. As soon as the first subscriber subscribes, it will consume the whole queue immediately, and after that moment the queue could be discarded. It should be instantiated once again when the last subscriber unsubscribes, and the subscribers become zero again. Is my understanding correct? – Theodor Zoulias May 23 '22 at 12:10
  • 1
    @TheodorZoulias, correct! The (empty) queue can be discarded until there is zero subscribers again. Then the queue becomes again relevant. In the code snippet I got it working like that for single subscriber (using a channel). I imagine it'd be a lot more involved for more general scenario where multiple subscribers can connect and disconnect dynamically :) – noseratio May 23 '22 at 12:53
  • 1
    I see. What you want is probably quite close to a `ReplayOnce` operator that I have posted [here](https://stackoverflow.com/questions/65494884/how-to-make-a-lightweight-replay-operator-that-can-be-subscribed-only-once "How to make a lightweight Replay operator that can be subscribed only once?"). Have we discussed about that in the past? Somehow this topic sounds familiar to me! – Theodor Zoulias May 23 '22 at 13:40
  • 1
    @TheodorZoulias, thanks, I think I've come across your `ReplayOnce` earlier today. Does it also support only one concurrent subscriber? I'll take a closer look, thanks again. I think in the past we had a discussion about a prototype of `DistinctSubject`, it turned out to be pretty useful! :) – noseratio May 23 '22 at 13:48
  • 1
    I found my déjà vu! :-) [An Rx observable that would act as ReplaySubject but only for the 1st subscriber?](https://stackoverflow.com/questions/69385181/an-rx-observable-that-would-act-as-replaysubject-but-only-for-the-1st-subscriber) – Theodor Zoulias May 23 '22 at 15:57
  • @TheodorZoulias, I think I've got it now: https://stackoverflow.com/a/72390574/1768303. A wasted bounty :-) – noseratio May 26 '22 at 10:48
  • 1
    At least you had the opportunity to implement the `IConnectableObservable` interface. Not many people on Earth can rightfully claim this feat in their resume! :-) – Theodor Zoulias May 26 '22 at 11:31
  • Not sure I got it entirely right :) Anyhow I don't think it would matter on my resume as the whole Rx tech is IMHO very underrated and under-used (at least in .NET ecosystem). I hope I'm wrong! :) – noseratio May 26 '22 at 13:10
  • Sadly, AsyncRx.NET (`IAsyncObservable` etc) hasn't even manifested into a NuGet package: https://github.com/dotnet/reactive/issues/1118#issuecomment-700254325 – noseratio May 26 '22 at 13:13
  • 1
    Yea, the Rx is not exactly hot right now. Judging from the activity [on GitHub](https://github.com/dotnet/reactive/issues/), it looks like an abandoned project. No one there gives official feedback to the questions by the community, for almost a year. As for the AsyncRx.NET, it's not clear to me what's the problem that this library is intended to solve. The semantics of Rx are already quite ambiguous, and throwing asynchrony in the mix can only increasing this ambiguity. It's not obvious which of the many ways to implement asynchrony is the correct one. – Theodor Zoulias May 26 '22 at 14:48
  • 1
    @TheodorZoulias, I think maybe the effort shifted to Reaqtive: https://github.com/reaqtive/reaqtor Also MS Orleans: https://learn.microsoft.com/en-us/dotnet/api/orleans.streams.iasyncobservable-1?view=orleans-3.0 – noseratio May 26 '22 at 21:30

0 Answers0