6

I want a generic way to convert an asynchronous method to an observable. In my case, I'm dealing with methods that uses HttpClient to fetch data from an API.

Let's say we have the method Task<string> GetSomeData() that needs to become a single Observable<string> where the values is generated as a combination of:

  • Repeated periodic calls to GetSomeData() (for example every x seconds)
  • Manually triggered calls to GetSomeData() at any given time (for example when user hits refresh).

Since there is two ways to trigger execution of GetSomeData() concurrency can be an issue. To avoid demanding that GetSomeData() is thread-safe, I want to limit the concurrency so that only one thread is executing the method at the same time. As a consequence I need to handle overlapping requests with some strategy. I made a (kind of) marble diagram trying to describe the problem and wanted outcome

marble diagram

My instinct tells me there is a simple way to achieve this, so please give me some insights :)

This is the solution I've got so far. It unfortunately doesn't solve the concurrency problem.

    public class ObservableCreationWrapper<T>
    {
        private Subject<Unit> _manualCallsSubject = new Subject<Unit>();
        private Func<Task<T>> _methodToCall;
        private IObservable<T> _manualCalls;

        public IObservable<T> Stream { get; private set; }

        public ObservableCreationWrapper(Func<Task<T>> methodToCall, TimeSpan period)
        {
            _methodToCall = methodToCall;
            _manualCalls = _manualCallsSubject.AsObservable()
                .Select(x => Observable.FromAsync(x => methodToCall()))
                .Merge(1);

            Stream = Observable.FromAsync(() => _methodToCall())
                .DelayRepeat(period)
                .Merge(_manualCalls);
        }

        public void TriggerAdditionalCall()
        {
            _manualCallsSubject.OnNext(Unit.Default);
        }
    }

Extension method for repeating with delay:

static class Extensions
{
    public static IObservable<T> DelayRepeat<T>(this IObservable<T> source, TimeSpan delay) => source
        .Concat(
            Observable.Create<T>(async observer =>
            {
                await Task.Delay(delay);
                observer.OnCompleted();
            }))
        .Repeat();
}

An example of a service containing the method to generate the observable

class SomeService
{
    private int _ticks = 0;

    public async Task<string> GetSomeValueAsync()
    {
        //Just a hack to dermine if request was triggered manuall or by timer
        var initiatationWay = (new StackTrace()).GetFrame(4).GetMethod().ToString().Contains("System.Threading.CancellationToken") ? "manually" : "by timer";

        //Here we have a data race! We would like to limit access to this method 
        var valueToReturn = $"{_ticks} ({initiatationWay})";

        await Task.Delay(500);
        _ticks += 1; 
        return valueToReturn;
    }
}

Used like this (data race will occur):

static async Task Main(string[] args)
{
    //Running this program will yield non deterministic results due to data-race in GetSomeValueAsync
    var someService = new SomeService();
    var stopwatch = Stopwatch.StartNew();
    var observableWrapper = new ObservableCreationWrapper<string>(someService.GetSomeValueAsync, TimeSpan.FromMilliseconds(2000));
    observableWrapper.Stream
        .Take(6)
        .Subscribe(x => 
            {
                Console.WriteLine($"{stopwatch.ElapsedMilliseconds} | Request: {x} fininshed");
            });

    await Task.Delay(4000);
    observableWrapper.TriggerAdditionalCall();
    observableWrapper.TriggerAdditionalCall();
    Console.ReadLine();
}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
figursagsmats
  • 155
  • 1
  • 1
  • 15
  • You're right that it's simple, but please give us some code to work with. What do the "regular async endpoints" look like? What does a "signal from the application" look like? A button click? A timer? – Enigmativity Nov 04 '20 at 11:18
  • @Enigmativity I've added example code of what I've got so far. It works, but does not fulfill my requirements entirely. Also, I suspect there are some improvements to be made. – figursagsmats Nov 05 '20 at 11:15
  • 1
    I have a hard time understanding the requirements for an acceptable solution. Could you design a [marble diagram](https://rxmarbles.com) showing sample input and output data? You don't need to draw it in Photoshop, you can just use plain ASCII characters like this: `Source: +--1-2-3--4--|`, `Result: +--A-B-C--D--|`. – Theodor Zoulias Nov 11 '20 at 15:41
  • @TheodorZoulias Thanks for the feedback! I Re-wrote the question to make it clearer and added a (kind of) marble diagram. – figursagsmats Nov 11 '20 at 21:04
  • Nice marble diagram! The question is much cleared now IMHO. I don't know why it was downvoted. – Theodor Zoulias Nov 12 '20 at 05:11
  • What happens with two manually triggered calls close to each other, so that the second is triggered before the completion of the first? Does it cancel the asynchronous method that is currently running? – Theodor Zoulias Nov 12 '20 at 06:35
  • What should happen in case the `methodToCall` fails with an exception? Should the `IObservable Stream` terminate in a failed state too? – Theodor Zoulias Nov 12 '20 at 07:22
  • "I need to handle overlapping requests with some strategy". I think your overlapping requests strategy could have been better. I wouldn't cancel the previous unresolved request when a new request is placed either automatically or manually. Your strategy decides at the time a request is made but it's better to decide at the time a response is received. Think about requests A2 and M1. You should cancel A2 only if M1 resolves before A1 does in order to guarantee receiving the most recent state from the API. – Redu Dec 31 '21 at 15:57

3 Answers3

4

Here is my take on this problem:


Update: I was able to simplify greatly my suggested solution by borrowing ideas from Enigmativity's answer. The Observable.StartAsync method handles the messy business of cancellation automatically¹, and the requirement of non-overlapping execution can be enforced simply by using a SemaphoreSlim.

/// <summary>
/// Creates an observable sequence containing the results of an asynchronous
/// function that is invoked periodically and manually. Overlapping invocations
/// are prevented. Timer ticks that would cause overlapping are ignored.
/// Manual invocations cancel previous invocations, and restart the timer.
/// </summary>
public static IObservable<T> PeriodicAndManual<T>(
    Func<bool, CancellationToken, Task<T>> functionAsync,
    TimeSpan period,
    out Action manualInvocation)
{
    // Arguments validation omitted
    var manualSubject = new Subject<bool>();
    manualInvocation = () => manualSubject.OnNext(true);
    return Observable.Defer(() =>
    {
        var semaphore = new SemaphoreSlim(1, 1); // Ensure no overlapping
        return Observable
            .Interval(period)
            .Select(_ => false) // Not manual
            .Merge(manualSubject)
            .TakeUntil(isManual => isManual) // Stop on first manual
            .Repeat() // ... and restart the timer
            .Prepend(false) // Skip the initial interval delay
            .Select(isManual =>
            {
                if (isManual)
                {
                    // Triggered manually
                    return Observable.StartAsync(async ct =>
                    {
                        await semaphore.WaitAsync(ct);
                        try { return await functionAsync(isManual, ct); }
                        finally { semaphore.Release(); }
                    });
                }
                else if (semaphore.Wait(0))
                {
                    // Triggered by the timer and semaphore acquired synchronously
                    return Observable
                        .StartAsync(ct => functionAsync(isManual, ct))
                        .Finally(() => semaphore.Release());
                }
                return null; // Otherwise ignore the signal
            })
            .Where(op => op != null)
            .Switch(); // Pending operations are unsubscribed and canceled
    });
}

The out Action manualInvocation argument is the mechanism that triggers a manual invocation.

Usage example:

int ticks = 0;
var subscription = PeriodicAndManual(async (isManual, token) =>
{
    var id = $"{++ticks} " + (isManual ? "manual" : "periodic");
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Begin {id}");
    await Task.Delay(500, token);
    return id;
}, TimeSpan.FromMilliseconds(1000), out var manualInvocation)
.Do(x => Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Received #{x}"))
.Subscribe();

await Task.Delay(3200);
manualInvocation();
await Task.Delay(200);
manualInvocation();
await Task.Delay(3200);

subscription.Dispose();

Output:

19:52:43.684 Begin 1 periodic
19:52:44.208 Received #1 periodic
19:52:44.731 Begin 2 periodic
19:52:45.235 Received #2 periodic
19:52:45.729 Begin 3 periodic
19:52:46.232 Received #3 periodic
19:52:46.720 Begin 4 periodic
19:52:46.993 Begin 5 manual
19:52:47.220 Begin 6 manual
19:52:47.723 Received #6 manual
19:52:48.223 Begin 7 periodic
19:52:48.728 Received #7 periodic
19:52:49.227 Begin 8 periodic
19:52:49.730 Received #8 periodic
19:52:50.226 Begin 9 periodic

The technique of using the Scan and the DistinctUntilChanged operators in order to drop elements while the previous asynchronous operation is running, is borrowed from this question.

¹ It seems that the Rx library does not handle this messy business satisfactory though, since it just omits disposing of the CancellationTokenSources it creates.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • The implementation could become even simpler if the `Observable.StartAsync` returned a type that is both a `Task` and an `IObservable`. But alas it doesn't. Creating such a type is possible, but [tricky](https://stackoverflow.com/questions/64837163/how-can-i-create-a-class-that-is-both-a-taskt-and-an-iobservablet). – Theodor Zoulias Nov 16 '20 at 13:17
  • 1
    Seriously impressive answer! I've tested and it works and solves my problem. I will most likely mark as correct answer, but first i want to understand all the suggested answers entirely. @TheodorZoulias is the CancellationToken necessary for preventing overlap in this version? – figursagsmats Nov 17 '20 at 20:19
  • @figursagsmats thanks mate! No, observing the `CancellationToken` is optional, and will not affect the enforcement of the single-concurrent-execution restriction. But it is preferable that the `functionAsync` reacts promptly to a cancellation signal. Otherwise a discarded periodic operation may keep running in the background, preventing a manually requested operation from starting in a timely manner. – Theodor Zoulias Nov 17 '20 at 20:37
0

Here's the query that you need:

var subject = new Subject<Unit>();
var delay = TimeSpan.FromSeconds(1.0);

IObservable<string> query =
    subject
        .StartWith(Unit.Default)
        .Select(x => Observable.Timer(TimeSpan.Zero, delay))
        .Switch()
        .SelectMany(x => Observable.FromAsync(() => GetSomeData()));

If any time you call subject.OnNext(Unit.Default) it will immediately trigger a call to GetSomeData and when then repeat the call based on the TimeSpan set in delay.

The use of .StartWith(Unit.Default) will set the query going immediately there is a subscriber.

The use of .Switch() cancels any pending operations based on a new subject.OnNext(Unit.Default) being called.

This should match your marble diagram.


The above version didn't introduce the delay between values.

Version 2 should.

var subject = new Subject<Unit>();
var delay = TimeSpan.FromSeconds(5.0);

var source = Observable.FromAsync(() => GetSomeData());

IObservable<string> query =
    subject
        .StartWith(Unit.Default)
        .Select(x => source.Expand(n => Observable.Timer(delay).SelectMany(y => source)))
        .Switch();

I've used the Expand operator to introduce a delay between values. As long as source only produces a single value (which FromAsync does) this should work just fine.

Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • There are some really neat and clever tricks in this implementation! But is seems that is doesn't prevent overlapping executions (caused by tasks having duration larger than `delay`), and also that it doesn't cancel the running operations when a manual invocation is triggered. So the results of all invocations are appearing eventually in the resulting stream. – Theodor Zoulias Nov 12 '20 at 13:13
  • @TheodorZoulias - It certainly does cancel with the `.Switch()`, but with the signature of `Task GetSomeData()` there is no way to cancel that unless it is changed to `Task GetSomeData(CancellationToken ct)` - then the final line of the query becomes `.SelectMany(x => Observable.FromAsync(ct => GetSomeData(ct)));`. – Enigmativity Nov 12 '20 at 23:22
  • @TheodorZoulias - I've updated my answer with a query that should add the required delay. It worked in my testing. – Enigmativity Nov 13 '20 at 00:36
  • I experimented with the `Switch` and the cancelable version of `Observable.FromAsync`. Very interesting! It requires though that the `Switch` affects the `FromAsync` observables, which is not the case in your Version 1 example. Instead it affects the `Timer` observables. The `FromAsync` are created after the `Switch`, and their cancellation occurs immediately after their normal completion (which is too late and ineffective). Also the cancellation is "best-effort", meaning that it is cooperative. The old tasks are not awaited before starting the new tasks, so the overlapping is still possible. – Theodor Zoulias Nov 13 '20 at 02:18
  • The Version 2 is also interesting. On the plus side the cancellation is functional and the overlapping is avoided (provided that the `GetSomeData` honors the token and completes synchronously when it receives the notification). The downsides are that it uses an `Experimental` operator, and that the invocation is not periodic. There is a constant delay between finishing one action and starting the next, instead of a constant delay between the starting points of subsequent actions (which is what the OP's marble diagram indicates). – Theodor Zoulias Nov 13 '20 at 03:47
  • @TheodorZoulias - It's not recursive so I don't think it works. How do see it working? – Enigmativity Nov 14 '20 at 11:31
0

I'd suggest not try to cancel an already started call. Things will get too messy. If the logic in GetSomeValueAsync involves database call and/or web API call, you simply cannot really cancel the call.

I think the key here is to make sure all the calls to GetSomeValueAsync are serialized.

I created the following solution based on Enigmativity's Version 1. It is tested on a webassembly blazor page on asp.net core 3.1, works fine.

private int _ticks = 0; //simulate a resource you want serialized access

//for manual event, trigger will be 0; for Timer event, trigger will be 1,2,3...
protected async Task<string> GetSomeValueAsync(string trigger)
{
    var valueToReturn = $"{DateTime.Now.Ticks.ToString()}: {_ticks.ToString()} | ({trigger})";

    await Task.Delay(1000);
    _ticks += 1;
    return valueToReturn;
}

//define two subjects
private Subject<string> _testSubject = new Subject<string>();
private Subject<string> _getDataSubject = new Subject<string>();

//driving observable, based on Enigmativity's Version 1
var delay = TimeSpan.FromSeconds(3.0);
IObservable<string> getDataObservable =
    _testSubject
   .StartWith("Init")
   .Select(x => Observable.Timer(TimeSpan.Zero, delay).Select(i => i.ToString()))
   .Switch()
   .WithLatestFrom(_getDataSubject.AsObservable().StartWith("IDLE"))
   .Where(a => a.Second == "IDLE")
   .Select(a => a.First);

//_disposables is CompositeDisposable defined in the page
_disposables.Add(getDataObservable.Subscribe(async t =>
{
     _getDataSubject.OnNext("WORKING");
     //_service.LogToConsole is my helper function to log data to console
     await _service.LogToConsole(await GetSomeValueAsync(t)); 
     _getDataSubject.OnNext("IDLE");
}));

That is it. I used a button to trigger manual events. The _ticks in output is always in sequence, that is, no overlapping happened.

ch_g
  • 1,394
  • 8
  • 12
  • Just add a button on page and click it - – ch_g Nov 14 '20 at 15:44