1

Recently I become aware that the Rx Finally operator behaves in a way which, at least for me, is unexpected. My expectation was that any error thrown by the finallyAction would be propagated to the operator's observers downstream. Alas this is not what happens. In the reality the operator first propagates the completion (or the failure) of the antecedent sequence to its observers, and then invokes the action, at a point in time when it's not possible to propagate a potential error thrown by the action. So it throws the error on the ThreadPool, and crashes the process. Which is not only unexpected, but also highly problematic. Below is a minimal demonstration of this behavior:

Observable
    .Timer(TimeSpan.FromMilliseconds(100))
    .Finally(() => throw new ApplicationException("Oops!"))
    .Subscribe(_ => { }, ex => Console.WriteLine(ex.Message),
        () => Console.WriteLine("Completed"));

Thread.Sleep(1000);

Outcome: Unhandled exception (Fiddle)

The exception thrown by the Finally lambda is not handled by the Subscribe:onError handler, as it would be desirable.

This feature (I am tempted to call it a flaw) limits severely the usefulness of the Finally operator in my eyes. Essentially I can only use it when I want to invoke an action that is expected to never fail, and if it fails it would indicate a catastrophic corruption of the application's state, when no recovery is possible. I could use it for example to Release a SemaphoreSlim (like I've done here for example), which can only fail if my code has a bug. I am OK with my app crashing in this case. But I've also used it recently to invoke an unknown action supplied by the caller, an action that could potentially fail, and crashing the app in this case is unacceptable. Instead, the error should be propagated downstream. So what I am asking here is how to implement a Finally variant (let's call it FinallySafe) with identical signature, and the behavior specified below:

public static IObservable<TSource> FinallySafe<TSource>(
    this IObservable<TSource> source, Action finallyAction);
  1. The finallyAction should be invoked after the source sequence has emitted an OnCompleted or an OnError notification, but before this notification is propagated to the observer.
  2. If the finallyAction invocation completed successfully, the original OnCompleted/OnError notification should be propagated to the observer.
  3. If the finallyAction invocation failed, an OnError notification should be propagated to the observer, containing the error that just occurred. In this case the previous error, the one that may have caused the source to complete with failure, should be ignored (not propagated).
  4. The finallyAction should also be invoked when the FinallySafe is unsubscribed before the completion of the source. When a subscriber (observer) disposes a subscription, the finallyAction should by invoked synchronously, and any error should be propagated to the caller of the Dispose method.
  5. If the FinallySafe is subscribed by multiple observers, the finallyAction should be invoked once per subscription, independently for each subscriber, following the rules above. Concurrent invocations are OK.
  6. The finallyAction should never be invoked more than once per subscriber.

Validation: replacing the Finally with the FinallySafe in the code snippet above, should result to a program that doesn't crash with an unhandled exception.

Alternative: I am also willing to accept an answer that provides a reasonable explanation about why the behavior of the built-in Finally operator is better than the behavior of the custom FinallySafe operator, as specified above.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • I'm not sure, but should the finally-block not called after the exception? Something like "read a file and finally close it". Here is a exception inside the finally-block, which is not the way to use it. – akop Nov 02 '21 at 19:48
  • @akop this is a good point. In the Rx world it is possible though to chain [`Catch`](https://learn.microsoft.com/en-us/previous-versions/dotnet/reactive-extensions/hh211765(v=vs.103)) and `Finally` operators is any order. It's not a fixed order imposed by the language syntax, like in C#. – Theodor Zoulias Nov 02 '21 at 19:53
  • "My expectation was that any error thrown by the `finallyAction` would be propagated to the operator's observers downstream." - That shouldn't be the expectation as the contract for Rx is that there can be only one `OnError` or `OnCompleted` - so it can't have another error after the sequence completes. – Enigmativity Nov 03 '21 at 00:52
  • "to invoke an action that is expected to never fail" - isn't this just a case of writing robust code? – Enigmativity Nov 03 '21 at 00:55
  • @Enigmativity writing fail-proof methods is certainly not a requirement for writing robust code. Otherwise half of the built-in .NET APIs, the APIs that are documented to throw `ArgumentException`s, `InvalidOperationException`s, `OperationCanceledException`s etc would be considered non-robust APIs. It's just not realistic to expect that every single method can handle all errors that may happen during its invocation. – Theodor Zoulias Nov 03 '21 at 07:22
  • @Enigmativity regarding the Rx contract, nowhere I am suggesting that it should be violated. What I am suggesting is to invoke the `finallyAction` *before* propagating the `OnComplete`/`OnError` notifications downstream. The timing of invoking the `finallyAction` is not part of the Rx contract. – Theodor Zoulias Nov 03 '21 at 07:27
  • @TheodorZoulias - Have a read of [this Eric Lippert blog entry](https://ericlippert.com/2008/09/10/vexing-exceptions). It's where I've formed my views on exception handling. – Enigmativity Nov 03 '21 at 09:43
  • @Enigmativity yeap, [that](https://ericlippert.com/2008/09/10/vexing-exceptions/) blog post be Eric Lippert is well known to me too. The exceptions that I am dealing with are (following the Lippertian terminology) **exogenous**, since they can be thrown by external (user-supplied) code, which is outside of my control. – Theodor Zoulias Nov 03 '21 at 09:54
  • @TheodorZoulias - But your code can catch them and meaningfully deal with them. – Enigmativity Nov 03 '21 at 10:53
  • @Enigmativity trust me, I have no meaningful way to handle the exception. If I had, I wouldn't ask this question. I am authoring a library, that accepts user-supplied actions. I am not writing code in the `Click` event of a `Button` or something. – Theodor Zoulias Nov 03 '21 at 11:17

3 Answers3

2

Finally gets called after the sequence has ended, and since the Rx contract only allows one OnError or OnCompleted it can't issue a second one.

But, if you replace the Finally with Do you can get the behaviour that you want.

Try this code:

Observable
    .Timer(TimeSpan.FromMilliseconds(100))
    .Do(_ => { }, () => throw new ApplicationException("Oops!"))
    .Subscribe
        (_ => { },
        ex => Console.WriteLine(ex.Message),
        () => Console.WriteLine("Completed"));

Thread.Sleep(TimeSpan.FromMilliseconds(1000));

That operates as you expect it to.

I get this output:

Oops!

If you want to run something at unsubscribe, then use this extension method:

public static class Ext
{
    public static IObservable<T> Unsubscribed<T>(this IObservable<T> source, Action unsubscribed) =>
        Observable.Create<T>(o =>
            new CompositeDisposable(source.Subscribe(o), Disposable.Create(unsubscribed)));
}

Here's an example of its use:

var source = Observable.Never<int>();

var subscription =
    source
        .Unsubscribed(() => Console.WriteLine("Unsubscribed"))
        .Subscribe();

subscription.Dispose();

That outputs:

Unsubscribed
Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • Thanks Enigmativity for the answer. The `Do` operator is indeed very close to what I am searching for. I wish this operator had an overload accepting an `onCompletedOrOnError` action, so that I can put my post-completion action in one place, when I am not interested whether the `source` completed successfully or with an error. I can't accept your answer though, because it's not covering the requirement of invoking the `finallyAction` when the source is unsubscribed. – Theodor Zoulias Nov 03 '21 at 07:41
  • 1
    @TheodorZoulias - I've updated my answer with an `Unsubscribed` extension method. – Enigmativity Nov 03 '21 at 09:05
  • It seems that the `Unsubscribed` operator has identical behavior with the `Finally` operator. See [this](https://dotnetfiddle.net/T295bW) fiddle. Just as the `Finally` it causes the process to crash, which is exactly the behavior that I am trying to avoid. – Theodor Zoulias Nov 03 '21 at 09:29
  • @TheodorZoulias - `Unsubscribed` is outside of the Rx contract so you have to write robust code. – Enigmativity Nov 03 '21 at 09:41
  • Enigmativity I am invoking unknown (user-supplied) code, so I can't guarantee that it's fail-proof, unless I wrap it in a big fat error-swallowing try-catch block. Which is something that I would like to avoid. Hence my request for the `FinallySafe` operator. – Theodor Zoulias Nov 03 '21 at 09:59
1

Here is an implementation of the FinallySafe operator, having the behavior specified in the question:

/// <summary>
/// Invokes a specified action after the source observable sequence terminates
/// successfully or exceptionally. The action is invoked before the propagation
/// of the source's completion, and any exception thrown by the action is
/// propagated to the observer. The action is also invoked if the observer
/// is unsubscribed before the termination of the source sequence.
/// </summary>
public static IObservable<T> FinallySafe<T>(this IObservable<T> source,
    Action finallyAction)
{
    return Observable.Create<T>(observer =>
    {
        var finallyOnce = Disposable.Create(finallyAction);
        var subscription = source.Subscribe(observer.OnNext, error =>
        {
            try { finallyOnce.Dispose(); }
            catch (Exception ex) { observer.OnError(ex); return; }
            observer.OnError(error);
        }, () =>
        {
            try { finallyOnce.Dispose(); }
            catch (Exception ex) { observer.OnError(ex); return; }
            observer.OnCompleted();
        });
        return new CompositeDisposable(subscription, finallyOnce);
    });
}

The finallyAction is assigned as the Dispose action of a Disposable.Create disposable instance, in order to ensure that the action will be invoked at most once. This disposable is then combined with the disposable subscription of the source, by using a CompositeDisposable instance.

As a side note, I would like to address the question if we could go even further, and propagate downstream a possible error of the finallyAction during the unsubscription. This could be desirable in some cases, but unfortunately it's not possible. First and foremost doing so would violate a guideline, found in The Observable Contract document, that states:

When an observer issues an Unsubscribe notification to an Observable, the Observable will attempt to stop issuing notifications to the observer. It is not guaranteed, however, that the Observable will issue no notifications to the observer after an observer issues it an Unsubscribe notification.

So such an implementation would be non-conforming. Even worse, the Observable.Create method enforces this guideline, by muting the observer immediately after the subscription is disposed. It does so by encapsulating the observer inside an AutoDetachObserver wrapper. And even if we tried to circumvent this limitation by implementing an IObservable<T> type from scratch, any built-in operator that could be attached after our non-conforming Finally operator would mute our post-unsubscription OnError notification anyway. So it's just not possible. An error during the unsubscription cannot be propagated to the subscriber that just requested to unsubscribe.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
0

I read the documentation and now I'm sure. The finally-operator will be called after the completition and should not throw any exception.

Compared to non-reactive programming:

StreamReader file = new StreamReader("file.txt");
string ln;  

try {  
   while ((ln = file.ReadLine()) != null) {  
      Console.WriteLine(ln);
   }
}
finally {
   // avoid to throw an exception inside of finally!
   if (file != null) {
      file.close();
   }
}

It is important to not throw an exception inside of finally.

Here is an example howto use it correctly (fiddle):

using System;
using System.Reactive.Linq;
using System.Threading;

public class Program
{
    public static void Main()
    {
        Observable
            .Range(1,5) // simulates stream-reader
            .Finally(() => Console.WriteLine("Close streamreader"))
            .Do(i => {
                if (i == 5) {
                    throw new ApplicationException("Oops!"); // simulates IO-error
                }
                
                Console.WriteLine("Read " + i);
            })
            .Subscribe(_ => { }, ex => Console.WriteLine(ex.Message),
                () => Console.WriteLine("Completed"));

        Thread.Sleep(1000);
    }
}

I'm not sure what you are trying to do (and I'm pretty new to c# reactive), but I think you are using not the right operator.

Edit

But you can patch it, if you want. In this article, they do something familar.
http://introtorx.com/Content/v1.0.10621.0/11_AdvancedErrorHandling.html

akop
  • 5,981
  • 6
  • 24
  • 51
  • akop surely I am using the wrong operator, since the behavior I get (application crash) is not the behavior I want (error propagation). Do you know which is the right operator? Because I don't. Hence my desire to invent an operator, the `FinallySafe`, that behaves exactly as I want. – Theodor Zoulias Nov 03 '21 at 16:51
  • I'm also don't know the right one. In the article, which is linked in my answer, they wrote a new finally-operator. Maybe this helps you, to wrote this for yourself. – akop Nov 03 '21 at 17:02
  • akop well ... I know that! The whole point of this question is to provide me with a working implementation of the requested operator. I know that it can be done. Actually I have already a rough implementation at hand, that I intend to publish as soon as I test that all corner cases are covered. – Theodor Zoulias Nov 03 '21 at 17:15
  • Ah, I see. So you want a implementation from us? I have a proposal: Open a Github project and link it in a own answer (with a small conclusion). I will try my best to help you on Github. I know a lot about RxJS, but Rx.Net is a little bit new for me. – akop Nov 03 '21 at 20:19
  • 1
    Nahh, I'll just post it here as an answer. It's ~20 lines of code, nothing special. – Theodor Zoulias Nov 03 '21 at 21:05