Recently I become aware that the Rx Finally
operator behaves in a way which, at least for me, is unexpected. My expectation was that any error thrown by the finallyAction
would be propagated to the operator's observers downstream. Alas this is not what happens. In the reality the operator first propagates the completion (or the failure) of the antecedent sequence to its observers, and then invokes the action
, at a point in time when it's not possible to propagate a potential error thrown by the action. So it throws the error on the ThreadPool
, and crashes the process. Which is not only unexpected, but also highly problematic. Below is a minimal demonstration of this behavior:
Observable
.Timer(TimeSpan.FromMilliseconds(100))
.Finally(() => throw new ApplicationException("Oops!"))
.Subscribe(_ => { }, ex => Console.WriteLine(ex.Message),
() => Console.WriteLine("Completed"));
Thread.Sleep(1000);
Outcome: Unhandled exception (Fiddle)
The exception thrown by the Finally
lambda is not handled by the Subscribe
:onError
handler, as it would be desirable.
This feature (I am tempted to call it a flaw) limits severely the usefulness of the Finally
operator in my eyes. Essentially I can only use it when I want to invoke an action that is expected to never fail, and if it fails it would indicate a catastrophic corruption of the application's state, when no recovery is possible. I could use it for example to Release
a SemaphoreSlim
(like I've done here for example), which can only fail if my code has a bug. I am OK with my app crashing in this case. But I've also used it recently to invoke an unknown action supplied by the caller, an action that could potentially fail, and crashing the app in this case is unacceptable. Instead, the error should be propagated downstream. So what I am asking here is how to implement a Finally
variant (let's call it FinallySafe
) with identical signature, and the behavior specified below:
public static IObservable<TSource> FinallySafe<TSource>(
this IObservable<TSource> source, Action finallyAction);
- The
finallyAction
should be invoked after thesource
sequence has emitted anOnCompleted
or anOnError
notification, but before this notification is propagated to the observer. - If the
finallyAction
invocation completed successfully, the originalOnCompleted
/OnError
notification should be propagated to the observer. - If the
finallyAction
invocation failed, anOnError
notification should be propagated to the observer, containing the error that just occurred. In this case the previous error, the one that may have caused thesource
to complete with failure, should be ignored (not propagated). - The
finallyAction
should also be invoked when theFinallySafe
is unsubscribed before the completion of thesource
. When a subscriber (observer) disposes a subscription, thefinallyAction
should by invoked synchronously, and any error should be propagated to the caller of theDispose
method. - If the
FinallySafe
is subscribed by multiple observers, thefinallyAction
should be invoked once per subscription, independently for each subscriber, following the rules above. Concurrent invocations are OK. - The
finallyAction
should never be invoked more than once per subscriber.
Validation: replacing the Finally
with the FinallySafe
in the code snippet above, should result to a program that doesn't crash with an unhandled exception.
Alternative: I am also willing to accept an answer that provides a reasonable explanation about why the behavior of the built-in Finally
operator is better than the behavior of the custom FinallySafe
operator, as specified above.