5

Introduction

In my WPF C# .NET application I use the reactive extensions (Rx) to subscribe to events and I often have to reload something from the DB to get the values I need to update the UI, because the event objects often only contains IDs and some meta data.

I use the Rx scheduling to load the data in the background and update the UI on the dispatcher. I have made some bad experience with mixing "Task.Run" inside of a Rx sequence (when using "SelectMany" the order is no longer guaranteed and it is hard to control the scheduling in UnitTests). See also: Executing TPL code in a reactive pipeline and controlling execution via test scheduler

My problem

If I shutdown my app (or close a tab) I want to unsubscribe and then await the DB call (which is called from a Rx "Select") that still can be running after "subscription.Dispose". Until now I haven't found any good utility or easy way to do that.

Questions

Is there any framework support to await everything still running in a Rx chain?

If not, do you have any good ideas how to make a easy to use utility?

Are there any good alternative ways to achieve the same?

Example

public async Task AwaitEverythingInARxChain()
{
    // In real life this is a hot observable event sequence which never completes
    IObservable<int> eventSource = Enumerable.Range(1, int.MaxValue).ToObservable();

    IDisposable subscription = eventSource
        // Load data in the background
        .ObserveOn(Scheduler.Default)
        .Select(id => LoadFromDatabase(id))

        // Update UI on the dispatcher
        .ObserveOn(DispatcherScheduler.Current)
        .SubscribeOn(Scheduler.Default) // In real life the source produces the event values on a background thread.
        .Subscribe(loadedData => UpdateUi(loadedData));

    Thread.Sleep(TimeSpan.FromSeconds(10));
// In real life I want to cancel (unsubscribe) here because the user has closed the Application or closed the tab and return a task which completes when everything is done.

    // Unsubscribe just guarantees that no "OnNext" is called anymore, but it doesn't wait until all operations in the sequence are finished (for example "LoadFromDatabase(id)" can still be runnig here.
    subscription.Dispose();

    await ?; // I need to await here, so that i can be sure that no "LoadFromDatabase(id)" is running anymore.

    ShutDownDatabase();
}

What I already tried (and didn't worked)

  • Using the "Finally" operator to set the result of a TaskCompletionSource. The problem with this approach: Finally gets called directly after unsubscribing and "LoadFromDatabase" can still be running

UPDATE: Example with console output and TakeUntil

public async Task Main()
{
    Observable
        .Timer(TimeSpan.FromSeconds(5.0))
        .Subscribe(x =>
        {
            Console.WriteLine("Cancel started");
            _shuttingDown.OnNext(Unit.Default);
        });

    await AwaitEverythingInARxChain();
    Console.WriteLine("Cancel finished");
    ShutDownDatabase();
    Thread.Sleep(TimeSpan.FromSeconds(3));
}

private Subject<Unit> _shuttingDown = new Subject<Unit>();

public async Task AwaitEverythingInARxChain()
{
    IObservable<int> eventSource = Observable.Range(0, 10);

    await eventSource
        .ObserveOn(Scheduler.Default)
        .Select(id => LoadFromDatabase(id))
        .ObserveOn(Scheduler.Default)
        .TakeUntil(_shuttingDown)
        .Do(loadedData => UpdateUi(loadedData));
}

public int LoadFromDatabase(int x)
{
    Console.WriteLine("Start LoadFromDatabase: " + x);
    Thread.Sleep(1000);
    Console.WriteLine("Finished LoadFromDatabase: " + x);

    return x;
}

public void UpdateUi(int x)
{
    Console.WriteLine("UpdateUi: " + x);
}

public void ShutDownDatabase()
{
    Console.WriteLine("ShutDownDatabase");
}

Output (actual):

Start LoadFromDatabase: 0
Finished LoadFromDatabase: 0
Start LoadFromDatabase: 1
UpdateUi: 0
Finished LoadFromDatabase: 1
Start LoadFromDatabase: 2
UpdateUi: 1
Finished LoadFromDatabase: 2
Start LoadFromDatabase: 3
UpdateUi: 2
Finished LoadFromDatabase: 3
Start LoadFromDatabase: 4
UpdateUi: 3
Cancel started
Cancel finished
ShutDownDatabase
Finished LoadFromDatabase: 4
Start LoadFromDatabase: 5
Finished LoadFromDatabase: 5
Start LoadFromDatabase: 6
Finished LoadFromDatabase: 6
Start LoadFromDatabase: 7

Expected: I want to have a guarantee that following are the last Outputs:

Cancel finished
ShutDownDatabase
Community
  • 1
  • 1
Jonas Benz
  • 503
  • 3
  • 12
  • Is `LoadFromDatabase` async or return an `IObservable`? – Shlomo Mar 07 '17 at 20:45
  • No, LoadFromDatabase is not async and it does not return a IObservable. Its just a synchronous call to the DB which returns a data object. Therefore I use ObserveOn(Scheduler.Default) to load the data in the background – Jonas Benz Mar 07 '17 at 20:49
  • So in your actual code, eventSource is hot? Does this hot observable handle the Database calls? – Brandon Kramer Mar 14 '17 at 13:17
  • @Brandon Kramer: Yes it is hot. It provides server events, which the client subscribe to. Sometimes the client has to load additional data, so it will call a repository to get them. The event source uses the EventLoopScheduler to forward the events. When we subscribe to it we want to load the additional data on the thread pool, so that we don't block the EventLoopThread. (I am not sure if I understood your second question) – Jonas Benz Mar 14 '17 at 17:54
  • I just meant that the eventSource is not going to call `LoadFromDatabase` correct? That will be done by the subscriber? – Brandon Kramer Mar 14 '17 at 17:55
  • Yes you are right. Because not all clients need the same data to react to the same event. – Jonas Benz Mar 14 '17 at 17:57
  • Ok, the only thing that I can see, is that `Console.WriteLine("Cancel finished");` and `ShutDownDatabase();` should be moved into a Finally on the observable in `AwaitEverythingInARxChain()`. This will allow you to just await the observable as @Enigmativity shows in his answer. The `await` will not be dependent on `eventSource` completing, because `TakeUntil()` will cause the subscription to be completed as soon as `_shuttingDown` publishes a value, and once the observable completes, you can be certain that both `Console.WriteLine();` and `ShutDownDatabase()` have been called. – Brandon Kramer Mar 14 '17 at 18:02
  • I want to await that the last call to LoadFromDatabase is completely finished ("Finished LoadFromDatabase" is written to the console). When the observable completes (even when the finally action is called) "LoadFromDatabase" can still be runnig (try it and look at the outputs). With all the solution provided until now there is no guarantee that the last LoadFromDatabase is finished completely after the await (befor shutting down the DB). But this guarantee is what I am trying to achive and what this whole StackOverflow article is all about. – Jonas Benz Mar 14 '17 at 18:24

3 Answers3

7

This is easier than you think. You can await observables. So simply do this:

public async Task AwaitEverythingInARxChain()
{
    IObservable<int> eventSource = Enumerable.Range(1, 10).ToObservable();

    await eventSource
        .ObserveOn(Scheduler.Default)
        .Select(id => LoadFromDatabase(id))
        .ObserveOn(DispatcherScheduler.Current)
        .Do(loadedData => UpdateUi(loadedData), () => ShutDownDatabase());
}

With a bit of Console.WriteLine action in your methods, and a little thread sleeping in the db call to simulate network delay, I get this output:

LoadFromDatabase: 1
LoadFromDatabase: 2
UpdateUi: 1
LoadFromDatabase: 3
UpdateUi: 2
LoadFromDatabase: 4
UpdateUi: 3
LoadFromDatabase: 5
UpdateUi: 4
LoadFromDatabase: 6
UpdateUi: 5
LoadFromDatabase: 7
UpdateUi: 6
LoadFromDatabase: 8
UpdateUi: 7
LoadFromDatabase: 9
UpdateUi: 8
LoadFromDatabase: 10
UpdateUi: 9
UpdateUi: 10
ShutDownDatabase

If you need to end the query, just create a shuttingDown subject:

private Subject<Unit> _shuttingDown = new Subject<Unit>();

...and then modify the query like this:

    await eventSource
        .ObserveOn(Scheduler.Default)
        .Select(id => LoadFromDatabase(id))
        .ObserveOn(DispatcherScheduler.Current)
        .Do(
            loadedData => UpdateUi(loadedData),
            () => ShutDownDatabase())
        .TakeUntil(_shuttingDown);

You just need issue a _shuttingDown.OnNext(Unit.Default); to unsubscribe the observable.


Here's my complete working test code:

async Task Main()
{
    Observable
        .Timer(TimeSpan.FromSeconds(5.0))
        .Subscribe(x => _shuttingDown.OnNext(Unit.Default));

    await AwaitEverythingInARxChain();
}

private Subject<Unit> _shuttingDown = new Subject<Unit>();

public async Task AwaitEverythingInARxChain()
{
    IObservable<int> eventSource = Observable.Range(0, 10);

    await eventSource
        .ObserveOn(Scheduler.Default)
        .Select(id => LoadFromDatabase(id))
        .ObserveOn(DispatcherScheduler.Current)
        .Finally(() => ShutDownDatabase())
        .TakeUntil(_shuttingDown)
        .Do(loadedData => UpdateUi(loadedData));
}

public int LoadFromDatabase(int x)
{
    Console.WriteLine("LoadFromDatabase: " + x);
    Thread.Sleep(1000);
    return x;
}

public void UpdateUi(int x)
{
    Console.WriteLine("UpdateUi: " + x);
}

public void ShutDownDatabase()
{
    Console.WriteLine("ShutDownDatabase");
}

I get this output:

LoadFromDatabase: 0
LoadFromDatabase: 1
UpdateUi: 0
LoadFromDatabase: 2
UpdateUi: 1
LoadFromDatabase: 3
UpdateUi: 2
LoadFromDatabase: 4
UpdateUi: 3
LoadFromDatabase: 5
UpdateUi: 4
ShutDownDatabase

Note that the observable tries to produce 10 values over 10 seconds, but it is cut short by the OnNext.

Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • Thanks for your answer. My problem is that eventSource actually never completes. I can't wait until it completes. I want to unsubscribe (cancel) and await that LoadFromDatabase is finished. – Jonas Benz Mar 08 '17 at 07:56
  • @JonasBenz - How do you know when to shut down the database then? – Enigmativity Mar 08 '17 at 07:58
  • I just tried to simplify the problem in my example. In real life the user is closing the application (or the system test is in the tear down) and on reaction to this I want to cancel the subscription and return a task that completes when I am sure that nothing is runnig anymore. – Jonas Benz Mar 08 '17 at 08:14
  • @JonasBenz - I've added a way to shut down the observable. – Enigmativity Mar 08 '17 at 09:57
  • Your way to shut down looks promising. I tried it in my example (I introduced the "TakeUntil" in the sequence and called "_shuttingDown.OnNext" instead of "subscription.Dispose"), but when I then await the observable, then it waits until the eventSource completes (even if I call "_shuttingDown.OnCompleted"). I don't understand what I am doing wrong... Can you provide a full example with the "_shuttingDown" and the console output included? – Jonas Benz Mar 08 '17 at 12:17
  • @JonasBenz - I'll get back to my code later today and have a go. In the meanwhile try calling `OnNext` **after** you await the observable. Calling `OnCompleted` doesn't issue a value so it doesn't stopped the `TakeUntil` - it has to be `OnNext`. – Enigmativity Mar 09 '17 at 03:04
  • I made some small changes to your example (some more WriteLines, a thread sleep at the end and I call ShutDownDatabase after the await). In real life the DB shutdown is done by an other class, I cant do it in the finally block. Like you can see from my output, "LoadFromDatabase" is still running after the await. – Jonas Benz Mar 09 '17 at 12:25
  • @JonasBenz - You have a design issue. DB connections shouldn't be left open like that. They should be built up during the query and closed as soon as the query completes. It would be correct to initiate the shutdown in the query. – Enigmativity Mar 10 '17 at 00:39
  • I think this is a misunderstanding. The DB connection is NOT left open after the query. I am working on a big project with a lot of SystemTests... When a System Test shuts down it calls and awaits a method called FinishAsync on the MainController, which do the same for hes subcontrollers. After the SystemTest has awaited the finishing it will shut down the whole System - it drops the Test Database, etc. Therefor it is important that no DB calls are running anymore, which would lead to error logs, which we want to avoid in good case test scenarios. – Jonas Benz Mar 10 '17 at 07:12
3

I finally found a solution myself. You can use TakeWhile to achive it. TakeUntil does not work, because the main observable sequence immediately completes when the second observable sequence produces the first value.

Here is a example of the working solution:

     public async Task Main_Solution()
    {
        CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();

        Observable
            .Timer(TimeSpan.FromSeconds(4))
            .Subscribe(x =>
            {
                Console.WriteLine("Cancel startedthread='{0}'", Thread.CurrentThread.ManagedThreadId);
                cancellationTokenSource.Cancel();
            });

        await AwaitEverythingInARxChain(cancellationTokenSource.Token);
        Console.WriteLine("Cancel finished thread='{0}'", Thread.CurrentThread.ManagedThreadId);
        ShutDownDatabase();
        Thread.Sleep(TimeSpan.FromSeconds(10));
    }

    public async Task AwaitEverythingInARxChain(CancellationToken token)
    {
        IObservable<int> eventSource = Observable.Range(0, 10);

        await eventSource
            .ObserveOn(Scheduler.Default)
            .Select(id => LoadFromDatabase(id))
            .TakeWhile(_ => !token.IsCancellationRequested)
            .ObserveOn(Scheduler.Default) // Dispatcher in real life
            .Do(loadedData => UpdateUi(loadedData)).LastOrDefaultAsync();
    }

    public int LoadFromDatabase(int x)
    {
        Console.WriteLine("Start LoadFromDatabase: {0} thread='{1}'", x, Thread.CurrentThread.ManagedThreadId);
        Thread.Sleep(TimeSpan.FromSeconds(3));
        Console.WriteLine("Finished LoadFromDatabase: {0} thread='{1}'", x, Thread.CurrentThread.ManagedThreadId);

        return x;
    }

    public void UpdateUi(int x)
    {
        Console.WriteLine("UpdateUi: '{0}' thread='{1}'", x, Thread.CurrentThread.ManagedThreadId);
    }

    public void ShutDownDatabase()
    {
        Console.WriteLine("ShutDownDatabase thread='{0}'", Thread.CurrentThread.ManagedThreadId);
    }

And the output:

Start LoadFromDatabase: 0 thread='9'
Finished LoadFromDatabase: 0 thread='9'
Start LoadFromDatabase: 1 thread='9'
UpdateUi: '0' thread='10'
Cancel startedthread='4'
Finished LoadFromDatabase: 1 thread='9'
Cancel finished thread='10'
ShutDownDatabase thread='10'

Note that "ShutDownDatabase" is the last output (as expected). It waits until "LoadFromDatabase" is finished for the second value, even if its produced value is not further processed. This is exactly what I want.

Jonas Benz
  • 503
  • 3
  • 12
0

You need to have something to await on. You can't await on a subscription disposal. The easiest way to do it would be to turn your disposal logic into part of the observable itself:

var observable = eventSource
    // Load data in the background
    .ObserveOn(Scheduler.Default)
    .Select(id => LoadFromDatabase(id))
    .TakeUntil(Observable.Timer(TimeSpan.FromSeconds(10))) //This replaces your Thread.Sleep call
    .Publish()
    .RefCount();

var subscription = observable.ObserveOn(DispatcherScheduler.Current)
    .Subscribe(loadedData => UpdateUi(loadedData));

//do whatever you want here.

await observable.LastOrDefault();
Shlomo
  • 14,102
  • 3
  • 28
  • 43
  • I already tried to await LastOrDefault(). But the task completes when the IObservable completes. But this does not guarantee that "LoadFromDatabase" is not still running after the completion – Jonas Benz Mar 07 '17 at 22:10
  • It actually does guarantee that. If `LoadFromDatabase` isn't asyncronous, then `LastOrDefault` shouldn't return until the last `LoadFromDatabase` call is complete. – Shlomo Mar 07 '17 at 22:11
  • Ah yes sorry you are right, when the IObservable completes. But my problem is that i am unsubscribing from a hot observable which never completes. And after the unsubscribing I want to await that nothing is runnig anymore. – Jonas Benz Mar 07 '17 at 22:15
  • Updated answer. – Shlomo Mar 07 '17 at 22:22
  • Hi Shlomo. AFAIK the `LastOrDefault` is not awaitable. Did you mean the `LastOrDefaultAsync`? – Theodor Zoulias Nov 20 '20 at 03:27