0

I feel like I'm trying to reinvent a wheel, so I better ask.

GIVEN

  • that I have an Observable<T> source
  • and Task LoadAsync<T>(T value) method

WHEN

  • I use Select/Switch pattern to call LoadMethod when source emits

    observable
       .Select(value => Observable.FromAsync(cancellationToken => LoadAsync(value, cancellationToken)))
       .Switch()
       .Subscribe(); 
    

THEN

  1. How do I add reload functionality?
  2. How do I report IsLoading status: whether the LoadAsync is running
  3. How to cancel LoadAsync when source completes

I want to create some reusable method, or class, that would implement answers to #1 and #2.

I have this so far: https://dotnetfiddle.net/0zPhBE

public class ReactiveLoader<T> : IDisposable
{
    private readonly BehaviorSubject<bool> _isLoading = new(false);
    private readonly Subject<Unit> _completes = new();
    private readonly Subject<T> _reloads = new Subject<T>();
    private readonly IDisposable _subscription;

    public bool IsLoading => _isLoading.Value;
    public IObservable<bool> IsLoadingObservable => _isLoading.Skip(1).DistinctUntilChanged(); //Not nice

    public ReactiveLoader(IObservable<T> observable, Func<T, CancellationToken, Task> handler)
    {           
        _subscription = observable
            .Finally(() => //Not nice
            {
                 _completes.OnNext(Unit.Default);
            })
            .Merge(_reloads)
            .Do(_ => _isLoading.OnNext(true))
            .Select(value => Observable.FromAsync(cancellationToken => handler(value, cancellationToken)))
            .Switch()
            .Do(_ => _isLoading.OnNext(false))
            .TakeUntil(_completes) //cancels loading when observable completes
            .Subscribe();
    }

    public void Reload()
    {
         _reloads.OnNext(??); //needs last value of source
    }

    public void Dispose()
    {
        _completes.OnNext(Unit.Default);
        _subscription.Dispose();
    }
}
Liero
  • 25,216
  • 29
  • 151
  • 297
  • What are the shortcomings/problems of the code that you have came up so far? – Theodor Zoulias Jan 11 '22 at 19:01
  • For example, in order to do reload, I need to remember last value somewhere. Also the Finally operator seems weird. Isn't there a better way of "TakeUntil other observable completes?" – Liero Jan 11 '22 at 19:32
  • 1
    If the source emits a new item during the loading of the previous item, the `Switch` operator will cancel the previous `LoadAsync`. Is this your intention? – Theodor Zoulias Jan 11 '22 at 22:45

1 Answers1

2

Here is one approach:

IObservable<bool> sequence = source.Publish(published => published
    .CombineLatest(_reloads, (x, _) => x)
    .Select(x => Observable.FromAsync(ct => LoadAsync(x, ct)).Select(_ => false).Prepend(true))
    .Switch()
    .Do(_isLoading)
    .TakeUntil(published.LastOrDefaultAsync()));

The CombineLatest operator will re-emit the latest value every time the _reloads emits a signal.

The .Select(_ => false).Prepend(true) converts the inner observable from an IObservable<Unit> to an IObservable<bool>, that emits loading-status signals.

The TakeUntil(published.LastOrDefaultAsync()) will terminate the sequence immediately when the source terminates (without waiting any pending LoadAsync operation).

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Publish operator is there to avoid multiple subsciption to source? – Liero Jan 13 '22 at 19:40
  • @Liero yeap, exactly. – Theodor Zoulias Jan 13 '22 at 20:59
  • Instead of `.Do(_isLoading)` I could do `_isLoading = sequence`; right? – Liero Jan 14 '22 at 09:21
  • @Liero yes, the `sequence` is essentially the same as the `_isLoading`. You may want to filter out the propagation of exceptions, or add invariably a `false` value at the end of the sequence, or both: `.Catch(Exception _ => Observable.Empty()).Append(false)`. – Theodor Zoulias Jan 14 '22 at 09:28
  • Why not `.Catch(Exception _ => Observable.Return(false));`? – Liero Jan 14 '22 at 09:47
  • @Liero this will append a `false` only in case of an exception, not in case of successful completion. But if the sequence completes successfully, most likely a `false` will have been emitted by the last inner sequence, so it's probably OK. – Theodor Zoulias Jan 14 '22 at 09:51
  • Opinionated question, but do you think this (RX) is right path of implementing requirement: async loading based on parameter with reload functionality in UI? – Liero Jan 14 '22 at 10:04
  • @Liero honestly I think that there are multiple tools that can do this job, and all of them can do it sufficiently well, so it's a mater of familiarity with the tools. Personally I would probably prefer to do it with TPL components, with which I am more familiar and comfortable. – Theodor Zoulias Jan 14 '22 at 11:36
  • Even if there was a requirement for something like debounceTime (resp Throttle in C#)? Angular did a great job at standardizing RX in angular projects, so everybody is (should) be familiar with it. In Blazor, few developers are familiar with RX, so I'm not sure whether I haven't gone too far :) Plus for RX is, that is is cross platform, cross language, unlike TPL – Liero Jan 14 '22 at 12:12
  • @Liero TBH my experience with developing front-end web applications is fairly small, and so is my experience with working in teams, so I am not really qualified to offer advices on this matter. :-) – Theodor Zoulias Jan 14 '22 at 12:20