3

I observed a weird behavior while experimenting with a PLINQ query. Here is the scenario:

  • There is a source IEnumerable<int> sequence that contains the two items 1 and 2.
  • A Parallel LINQ Select operation is applied on this sequence, projecting each item to itself (x => x).
  • The resulting ParallelQuery<int> query is consumed immediately with a foreach loop.
  • The selector lambda of the Select projects successfully the item 1.
  • The consuming foreach loop throws an exception for the item 1.
  • The selector lambda throws an exception for the item 2, after a small delay.

What happens next is that the consuming exception is lost! Apparently it is shadowed by the exception thrown afterwards in the Select. Here is a minimal demonstration of this behavior:

ParallelQuery<int> query = Enumerable.Range(1, 2)
    .AsParallel()
    .Select(x =>
    {
        if (x == 2) { Thread.Sleep(500); throw new Exception($"Oops!"); }
        return x;
    });

try
{
    foreach (int item in query)
    {
        Console.WriteLine($"Consuming item #{item} started");
        throw new Exception($"Consuming item #{item} failed");
    }
}
catch (AggregateException aex)
{
    Console.WriteLine($"AggregateException ({aex.InnerExceptions.Count})");
    foreach (Exception ex in aex.InnerExceptions)
        Console.WriteLine($"- {ex.GetType().Name}: {ex.Message}");
}
catch (Exception ex)
{
    Console.WriteLine($"{ex.GetType().Name}: {ex.Message}");
}

Output:

Consuming item #1 started
AggregateException (1)
- Exception: Oops!

Live demo.

Chronologically the consuming exception happens first, and the PLINQ exception happens later. So my understanding is that the consuming exception is more important, and it should be propagated with priority. Nevertheless the only exception that is surfaced is the one that occurs inside the PLINQ code.

My question is: why is the consuming exception lost, and is there any way that I can fix the query so that the consuming exception is propagated with priority?

The desirable output is this:

Consuming item #1 started
Exception: Consuming item #1 failed
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Theoretically, PLINQ could execute many loops before returning a single item, and I don't think there is any documentation stating when an exception will be thrown (ie if item #3 throws then the `foreach` could throw even on the very first execution). – Charlieface Nov 30 '22 at 12:16
  • @Charlieface in the question's example the value 1 is emitted immediately by the PLINQ query, before completing the `selector` of the item 2. I could have configured the query with [`WithMergeOptions(ParallelMergeOptions.NotBuffered)`](https://learn.microsoft.com/en-us/dotnet/api/system.linq.parallelenumerable.withmergeoptions) to make this behavior even more likely, but apparently it is not required in this case. – Theodor Zoulias Nov 30 '22 at 17:08
  • Actually your problem is your thinking execution of the parallel query has anything to do with the order of the `foreach`. If you add a write for `x` in the `Select`, you will see that the parallel query is completely evaluated before the first consuming occurs - sometimes in the order 2, 1. `NotBuffered` makes this obvious, but the default `AutoBuffered` can cause the order to swap all around and you might Select 2,3,1 then consume 3 first... there is no order guarantee once you use `AsParallel`. – NetMage Nov 30 '22 at 21:58
  • @NetMage my expectation is that the items are emitted by the PLINQ query in (roughly) the same order that the associated `selector`s are completing execution, unless I have requested ordered emission with the [`AsOrdered`](https://learn.microsoft.com/en-us/dotnet/api/system.linq.parallelenumerable.asordered) operator. But my question is not based on this expectation. In the demo the item 1 has been emitted, the consuming `foreach` code failed with an exception, and this exception has been lost. That's the weird thing. Don't you agree that the exception should be surfaced? – Theodor Zoulias Nov 30 '22 at 22:27
  • 2
    I think what you are seeing is the result of the compiler translation of the `foreach` into a `while (MoveNext())` with a `try` / `finally` to dispose of the enumerator. When the inner exception is thrown, it is caught by the `finally` and the `Dispose` of the enumerator causes all the `Select` threads to finish, which causes an exception inside the `finally` block, which throws away the initial exception as discussed [here](https://stackoverflow.com/a/2911229/2557128). You need to use your own loop and a `CancellationTokenSource` if you want to prevent this. – NetMage Nov 30 '22 at 22:53
  • @NetMage that's the kind of explanation and solution that I am looking for! Could you show an example of the `CancellationTokenSource`-based solution? Currently I am not able to visualize it. – Theodor Zoulias Nov 30 '22 at 23:07
  • See also [PLINQ Exceptions](https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/introduction-to-plinq#exceptions) where it explains that some items may be processed after an exception is thrown, and [Exceptions in Parallel Loops](https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/how-to-handle-exceptions-in-parallel-loops): "an unhandled exception causes the loop to terminate as soon as all currently running iterations finish" – NetMage Nov 30 '22 at 23:08
  • @NetMage from a quick look, the two links [1](https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/introduction-to-plinq#exceptions), [2](https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/how-to-handle-exceptions-in-parallel-loops) don't seem to be helpful for explaining or solving this question's problem. I am not expecting that the `ParallelQuery` will complete immediately after an exception happens in the parallel `Select`. But I do expect that an exception thrown in the consuming `foreach` will not be lost! – Theodor Zoulias Nov 30 '22 at 23:11
  • The links explain how an exception in a parallel query consumption might not stop all processing of the parallel query. The link in my first comment about the `finally` block explains how an exception during `finally` processing will throw away the first exception. – NetMage Nov 30 '22 at 23:31
  • @NetMage you mean [this](https://stackoverflow.com/questions/2911215/what-happens-if-a-finally-block-throws-an-exception/2911229#2911229) link? Yea, this gives useful information about the behavior of the `finally` in general. But there is no hint in the other two links that the PLINQ propagates exceptions via the `Dispose` route, that can cause consuming exceptions to be lost. Not even the smartest person that ever lived (Archimedes!) could deduct this behavior by just reading these two documentation pages, I think. – Theodor Zoulias Nov 30 '22 at 23:39
  • 1
    Not necessarily the smartest person, but I read a lot of C# internals stuff (programming languages and their implementations being both a hobby and something I studied/worked with in college) and immediately thought that the compiler implementation of the `foreach` would yield an explanation :) – NetMage Dec 01 '22 at 00:23

2 Answers2

3

I think what you are seeing is the result of the compiler translation of the foreach into a while (MoveNext()) with a try/finally to dispose of the enumerator. When the inner exception is thrown, it is caught by the finally and the Dispose() of the enumerator causes all the Select threads to finish, which causes an exception inside the finally block, which throws away the initial exception as discussed here. You need to use your own loop and a try/catch if you want to prevent this, though I think the Microsoft recommendation would be to use a try/catch in the Select to be closer to the source of the exception.

Here is a modification of your existing code replacing the foreach with the compiler generated expansion of foreach using an enumerator. (I use LINQPad to see the C# 1.0 equivalent code / IL code from the compiler.)

You can capture any exceptions during the Dispose of the enumerator and then bundle them up with the original exception into an AggregateException when you catch them.

I wrapped the boilerplate into an extension method to replace the normal foreach:

var b = true;
var query = Enumerable.Range(1, 3)
    .AsParallel()
    .Select(x => {
        Thread.Sleep(50 * (x - 1));
        Console.WriteLine($"Select({x})");
        if (x >= 2) {
            throw new Exception($"Oops {x}!");
        }
        return x;
    });

try {
    query.ForEachAggregatingExceptions(item => {
        Console.WriteLine($"Consuming item #{item} started");
        if (b) {
            throw new Exception($"Consuming item #{item} failed");
        }
    });
}
catch (AggregateException aex) {
    Console.WriteLine($"AggregateException ({aex.InnerExceptions.Count})");
    foreach (Exception ex in aex.InnerExceptions)
        Console.WriteLine($"- {ex.GetType().Name}: {ex.Message}");
}
catch (Exception ex) {
    Console.WriteLine($"{ex.GetType().Name}: {ex.Message}");
}

public static class ParallelQueryExt {
    public static void ForEachAggregatingExceptions<T>(this ParallelQuery<T> pq, Action<T> processFn) {
        Exception FirstException = null;
        var e = pq.GetEnumerator();
        try {
            while (e.MoveNext())
                processFn(e.Current);
        }
        catch (Exception ex) {
            FirstException = ex;
        }
        finally {
            if (e != null) {
                try {
                    e.Dispose();
                }
                catch (AggregateException aex) { // combine exceptions from Dispose with FirstException if any
                    if (FirstException != null) {
                        throw new AggregateException(aex.InnerExceptions.Prepend(FirstException));
                    }
                    else
                        throw;
                }
                catch (Exception ex) { // combine single exception from Dispose with FirstException if any
                    throw new AggregateException(new[] { ex, FirstException });
                }
                if (FirstException != null) // re-throw FirstException if no others occurred
                    throw FirstException;
            }
        }
    }
}

PS The b variable and the if prevents the compiler from optimizing out the while loop into an if since it can figure out the throw will prevent the loop from executing more than once pass.

NetMage
  • 26,163
  • 3
  • 34
  • 55
  • Thanks NetMage for the answer! This seems to solve the problem, albeit with lots of boilerplate code in the consuming site. But I am not sure that I understand the need for the `CancellationTokenSource`. – Theodor Zoulias Nov 30 '22 at 23:23
  • @TheodorZoulias The `CancellationTokenSource` is an option if you uncomment out the `cts.Cancel()` and instead comment out the `try {` `} catch { }` lines around the `e.Dispose()`. Then the threads will be cancelled, and the exception in the `Select` won't get thrown. But sometimes it might be, depending on who wins the race, so I prefer the `try`/`catch` instead. – NetMage Nov 30 '22 at 23:33
  • @TheodorZoulias I modified my answer to bundle up any exceptions that happen during the `Dispose` and return them all if necessary. Note that since this is parallel multi-threaded code, not all exceptions that would occur if every `Select` ran to completion will necessarily occur since they may be at different points during execution, or not yet scheduled to execute at all. – NetMage Nov 30 '22 at 23:37
  • Ah, now I understand what you want to do with the `CancellationTokenSource`. You want to prevent PLINQ's worker threads from starting more work after the failure of the consuming `foreach`. I would guess that the PLINQ knows how to do it by itself. Calling `Dispose` should have the same effect as cancelling the cancellation token. – Theodor Zoulias Nov 30 '22 at 23:49
  • Btw your idea about combining the caught `FirstException` with the `e.Dispose()` exception is pretty good! But it might be better if the propagated exception is always an `AggregateException`, in order to simplify the outermost handling code that logs the errors in the console. – Theodor Zoulias Nov 30 '22 at 23:56
  • You could consider including [this comment](https://stackoverflow.com/questions/74623812/exception-is-lost-while-consuming-a-plinq-query/74635235?noredirect=1#comment131739551_74623812) inside the answer, so that it answers also the "why" part of the question. Currently it answers only the "how to fix" part! – Theodor Zoulias Dec 01 '22 at 00:04
  • @TheodorZoulias Good suggestion. I also wrapped the boiler plate into an extension method which made me realize Microsoft would probably suggest putting the `try` in the `Select` body and using a regular `foreach`, but that has its down sides as well. – NetMage Dec 01 '22 at 00:21
  • @TheodorZoulias Also, when I tested with the `CancellationTokenSource`, it definitely prevented the `Dispose` from throwing. `Dispose` waits for the currently running threads to finish instead of immediately cancelling them (I saw that documented but I lost the reference), which means your `throw` for 2 get executed during the `Dispose` but not when cancelled, assuming it is still in the `Sleep`, hence my conclusion that it is too risky to count on, as it sets up a race condition. – NetMage Dec 01 '22 at 00:27
  • Now it's much better! Btw I doubt that canceling the `CancellationTokenSource` has the effect of aborting or [interrupting](https://learn.microsoft.com/en-us/dotnet/api/system.threading.thread.interrupt) `ThreadPool` threads. I might have to experiment with it, and see if I can reproduce the effect of preventing the `Dispose` from throwing. – Theodor Zoulias Dec 01 '22 at 00:48
  • 1
    @TheodorZoulias You may be right, further testing seems to indicate I may have had the `throw` in the `Select` commented out when testing the cancellation token. I took out the suggestion that a `CancellationTokenSource` might help :) – NetMage Dec 01 '22 at 00:55
1

NetMage's answer explains that the observed behavior is caused by the error thrown on Dispose of the PLINQ enumerator. My guess about why the PLINQ library violates the common wisdom about throwing exceptions on Dispose, which is to avoid throwing unless the error is critical, is because the library was introduced on .NET 4.0. In this .NET version an unobserved faulted Task resulted in the termination of the processes. The process was crashing when the faulted Task was garbage collected, after raising the TaskScheduler.UnobservedTaskException. So the PLINQ designers had to choose between throwing on Dispose, swallowing the exception completely, or crashing the process, and they choose what seemed like the lesser evil of the available options. That's my guess.

Had the library been authored on .NET 4.5, they might had decided differently. In that .NET version, the process would no longer crash when an unobserved faulted Task was garbage collected. Reverting to the .NET 4.0 policy is still possible through a configuration setting, but I doubt that anyone ever used this setting to revert to the original irrational behavior.

My approach for fixing PLINQ's error-losing behavior is a bit different that NetMage's approach. Instead of bundling all errors in an AggregateException, I prefer to suppress the exception that is thrown by PLINQ on Dispose, and propagate it through the TaskScheduler.UnobservedTaskException mechanism. This can be achieved easily by just creating a faulted task with the Task.FromException method, and leaving it unobserved:

/// <summary>
/// Suppresses the error that might be thrown by the enumerator on Dispose.
/// The error triggers the TaskScheduler.UnobservedTaskException event.
/// </summary>
public static IEnumerable<TSource> SuppressDisposeException<TSource>(
    this IEnumerable<TSource> source)
{
    ArgumentNullException.ThrowIfNull(source);
    IEnumerator<TSource> enumerator = source.GetEnumerator();
    try
    {
        while (enumerator.MoveNext()) yield return enumerator.Current;
        try { enumerator.Dispose(); } finally { enumerator = null; }
    }
    finally
    {
        try { enumerator?.Dispose(); }
        catch (Exception ex) { _ = Task.FromException(ex); }
    }
}

The finally block of the iterator can be invoked either by the MoveNext or the Dispose of the autogenerated enumerator. In case it is invoked on MoveNext, we want to propagated normally a source Dispose exception. We only want to suppress it if it happens on the autogenerated Dispose. That's the reason for disposing the source enumerator preferentially in the try block, and only as a fallback in the finally block.

Usage example:

IEnumerable<int> query = Enumerable.Range(1, 2)
    .AsParallel()
    .Select(x => /* ... */ x)
    .SuppressDisposeException();

In order to watch the TaskScheduler.UnobservedTaskException event being triggered, you might have to call GC.Collect as part of the test.

The justification for suppressing the exception on Dispose from the synchronous execution flow, is because I consider the parallel nature of PLINQ as a form of speculative execution. The PLINQ engine might do more work than what the consumer of the query is interested to receive. So in case the consumer abandons the enumeration prematurely, either voluntarily by breaking the foreach loop, or unwillingly because it suffered an exception, the PLINQ should not bother the consumer with anything that might happen past the point that the consumer lost interest for the enumeration.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104