1

I have and async Load method that is triggered by a observable subscription.

I want to prevent concurrent calls of LoadAsync using following strategy:

enter image description here

I'm almost sure this strategy has a name, but I can't recall it.

  1. LoadAsync won't be called if previous calls has not finished yet.

    • LoadAsync(3) has started only after LoadAsync(1) has finished
  2. LoadAsync is called only for the most recent value

    • LoadAsync(2) has never been called

I currently use this switchmap equivalent, but it cancels the LoadAsync methods in favor of new calls.
Observable<int> sourceObservable..
sourceObservable
   .Select(value => Observable.FromAsync(() => LoadAsync(value)))
   .Switch() 

async Task<int> LoadAsync(int value)
{
    await Task.Delay(1000);
    return value;
}

in javascript it would be:

source$.pipe(switchMap(Load))

function load(id): Observable<int> {
    return delay(1000).pipe(map(() => id));
}

EDIT: I wonder why nobody asked for this flattening strategy yet. I thought it's pretty common requirement.

This is how CI/CD pipelines often works, for example. If there are too many pushes to a branch and you don't have the capacity to build them all, but you want to ensure that the latest commit will be built

Liero
  • 25,216
  • 29
  • 151
  • 297

2 Answers2

1

In other words, you want to take one child observable from the 2nd-level observable, get all results and wait for that to finish, then repeat.

This will work in C#:

public static IObservable<T> SingleSwitch<T, U>(this IObservable<U> source, Func<U, IObservable<T>> selector)
{
    return source.Publish(_source => _source
        .Take(1)
        .SelectMany(e => selector(e))
        .Repeat()
    );
}

Please note that you have a bug in your Load. You'll need to add await for it to the delay properly work.

Sample code:

Observable.Interval(TimeSpan.FromSeconds(.75))
    .SingleSwitch(e => Observable.FromAsync (async () => {
        await Task.Delay(1000);
        return e;
    }))
    .Dump();

Output is even numbers, odd ones get blocked because the evens are running.

Shlomo
  • 14,102
  • 3
  • 28
  • 43
  • _"get all results"_ - not exactly. Notice that #2 never triggered the Load operation – Liero May 06 '21 at 11:49
  • The `SingleSwitch` looks very similar to the [`ExhaustMap`](https://stackoverflow.com/questions/64353907/how-can-i-implement-an-exhaustmap-handler-in-rx-net) operator. – Theodor Zoulias May 06 '21 at 11:59
  • Get all results from the child observable pulled. – Shlomo May 06 '21 at 12:05
  • 1
    @Theodore, that’s exactly what it is – Shlomo May 06 '21 at 12:10
  • I haven't tested it, but from the looks of it the resulting sequence of the `SingleSwitch` operator will be infinite, even if the `source` sequence is not. But I may be wrong. – Theodor Zoulias May 06 '21 at 18:33
  • You're right. It also fails the "cache the last element during dead time" requirement. – Shlomo May 06 '21 at 21:13
  • Apparently the OP stopped providing feedback. I think that the problem of ignoring elements except from the latest one could be solved with the `MergeBounded` operator discussed in [this](https://stackoverflow.com/questions/65477010/how-to-merge-a-nested-observable-iobservableiobservablet-with-limited-concur/ "How to merge a nested observable IObservable> with limited concurrency and limited buffer capacity?") question. Replacing the `.Switch()` in the OP's example with `.MergeBounded(1, 1)` should be enough. – Theodor Zoulias May 07 '21 at 02:52
  • @TheodorZoulias: ExhaustMap might ignore the last value. See https://stackblitz.com/edit/typescript-impgfv?file=index.ts – Liero May 07 '21 at 07:58
1

MergeBounded(1,1) mentioned by @TheodorZoulias works, but I it seems a little bit too complex.

I have written a simpler alternative, but I'm looking for most appropriate name

/// <summary>
/// Concatenates most recent inner observable sequence when previous completes.
/// Similar to Concat, but it ignores out of date inner observable sequences.
/// Similar to Exhaust, but it preserves latest inner observable.
/// </summary>
public static IObservable<T> ConcatExhaust<T>(this IObservable<IObservable<T>> source)
{
    return Observable.Defer(() =>
    {
        IObservable<T> latest = default;
        return source
            .Select(inner =>
            {
                latest = inner;
                return Observable.Defer(() => latest == inner ? inner : Observable.Empty<T>());
            })
            .ConcatExhaust();
    });
}

The following test:

var source = Observable.Interval(TimeSpan.FromMilliseconds(300))
    .Take(5)
    .Do(val => Console.WriteLine($"Source: {val}"));


source.Select(val => Observable.FromAsync(() => LoadAsync(val)))
     .ConcatExhaust()
     .Wait();

returns:

Source: 0
Load Started: 0
Source: 1
Source: 2
Source: 3
Value finished: 0
Load Started: 3
Source: 4
Value finished: 3
Load Started: 4
Value finished: 4
marc_s
  • 732,580
  • 175
  • 1,330
  • 1,459
Liero
  • 25,216
  • 29
  • 151
  • 297
  • This looks like a good solution, provided that the inner sequences are unique objects. Also I am not sure whether race conditions exist regarding the `latest` variable, that could result to subscribing twice to the same inner sequence. It depends on the implementation of the `Concat` operator. To be one the safe side you could use `lock`s or the `Interlocked` class, so that the `latest` variable is checked-updated atomically. – Theodor Zoulias May 07 '21 at 09:54
  • What about the naming? – Liero May 07 '21 at 10:21
  • I am not sure what an appropriate name could be. – Theodor Zoulias May 07 '21 at 10:35
  • I think I have removed the race condition now. Even if inner observables are not unique, this still works as expected. Or at least as I expect it to work :D – Liero May 07 '21 at 10:36
  • 1
    Your updated version is probably fine, but personally, after having read [this article](https://docs.microsoft.com/en-us/archive/msdn-magazine/2012/december/csharp-the-csharp-memory-model-in-theory-and-practice) by Igor Ostrovsky, I am becoming nervous when I see shared state to be accessed from multiple threads without synchronization. You could ask Peter Cordes about the correctness of your lock-free solution, who knows [a thing or two](https://stackoverflow.com/a/66490395/11178549) about cache coherency, but for my level of knowledge I would add synchronization without a second thought. – Theodor Zoulias May 07 '21 at 10:49
  • Liero I deleted [my answer](https://stackoverflow.com/a/67433434/11178549) because the mere fact that finding a name for this operator is difficult, indicates that it may not be that useful. The [`MergeBounded`](https://stackoverflow.com/questions/65477010/how-to-merge-a-nested-observable-iobservableiobservablet-with-limited-concur) is more expressive IMHO, and can be used to solve a broader range of problems. – Theodor Zoulias May 09 '21 at 07:10