Introduction
In my WPF C# .NET application I use the reactive extensions (Rx) to subscribe to events and I often have to reload something from the DB to get the values I need to update the UI, because the event objects often only contains IDs and some meta data.
I use the Rx scheduling to load the data in the background and update the UI on the dispatcher. I have made some bad experience with mixing "Task.Run" inside of a Rx sequence (when using "SelectMany" the order is no longer guaranteed and it is hard to control the scheduling in UnitTests). See also: Executing TPL code in a reactive pipeline and controlling execution via test scheduler
My problem
If I shutdown my app (or close a tab) I want to unsubscribe and then await the DB call (which is called from a Rx "Select") that still can be running after "subscription.Dispose". Until now I haven't found any good utility or easy way to do that.
Questions
Is there any framework support to await everything still running in a Rx chain?
If not, do you have any good ideas how to make a easy to use utility?
Are there any good alternative ways to achieve the same?
Example
public async Task AwaitEverythingInARxChain()
{
// In real life this is a hot observable event sequence which never completes
IObservable<int> eventSource = Enumerable.Range(1, int.MaxValue).ToObservable();
IDisposable subscription = eventSource
// Load data in the background
.ObserveOn(Scheduler.Default)
.Select(id => LoadFromDatabase(id))
// Update UI on the dispatcher
.ObserveOn(DispatcherScheduler.Current)
.SubscribeOn(Scheduler.Default) // In real life the source produces the event values on a background thread.
.Subscribe(loadedData => UpdateUi(loadedData));
Thread.Sleep(TimeSpan.FromSeconds(10));
// In real life I want to cancel (unsubscribe) here because the user has closed the Application or closed the tab and return a task which completes when everything is done.
// Unsubscribe just guarantees that no "OnNext" is called anymore, but it doesn't wait until all operations in the sequence are finished (for example "LoadFromDatabase(id)" can still be runnig here.
subscription.Dispose();
await ?; // I need to await here, so that i can be sure that no "LoadFromDatabase(id)" is running anymore.
ShutDownDatabase();
}
What I already tried (and didn't worked)
- Using the "Finally" operator to set the result of a TaskCompletionSource. The problem with this approach: Finally gets called directly after unsubscribing and "LoadFromDatabase" can still be running
UPDATE: Example with console output and TakeUntil
public async Task Main()
{
Observable
.Timer(TimeSpan.FromSeconds(5.0))
.Subscribe(x =>
{
Console.WriteLine("Cancel started");
_shuttingDown.OnNext(Unit.Default);
});
await AwaitEverythingInARxChain();
Console.WriteLine("Cancel finished");
ShutDownDatabase();
Thread.Sleep(TimeSpan.FromSeconds(3));
}
private Subject<Unit> _shuttingDown = new Subject<Unit>();
public async Task AwaitEverythingInARxChain()
{
IObservable<int> eventSource = Observable.Range(0, 10);
await eventSource
.ObserveOn(Scheduler.Default)
.Select(id => LoadFromDatabase(id))
.ObserveOn(Scheduler.Default)
.TakeUntil(_shuttingDown)
.Do(loadedData => UpdateUi(loadedData));
}
public int LoadFromDatabase(int x)
{
Console.WriteLine("Start LoadFromDatabase: " + x);
Thread.Sleep(1000);
Console.WriteLine("Finished LoadFromDatabase: " + x);
return x;
}
public void UpdateUi(int x)
{
Console.WriteLine("UpdateUi: " + x);
}
public void ShutDownDatabase()
{
Console.WriteLine("ShutDownDatabase");
}
Output (actual):
Start LoadFromDatabase: 0
Finished LoadFromDatabase: 0
Start LoadFromDatabase: 1
UpdateUi: 0
Finished LoadFromDatabase: 1
Start LoadFromDatabase: 2
UpdateUi: 1
Finished LoadFromDatabase: 2
Start LoadFromDatabase: 3
UpdateUi: 2
Finished LoadFromDatabase: 3
Start LoadFromDatabase: 4
UpdateUi: 3
Cancel started
Cancel finished
ShutDownDatabase
Finished LoadFromDatabase: 4
Start LoadFromDatabase: 5
Finished LoadFromDatabase: 5
Start LoadFromDatabase: 6
Finished LoadFromDatabase: 6
Start LoadFromDatabase: 7
Expected: I want to have a guarantee that following are the last Outputs:
Cancel finished
ShutDownDatabase