1

I'm not sure if I'm stopping a Parallel.ForEach loop as I intend to do. So let me outline the problem.

The loop uses a database driver with limited available connections and it is required to keep track of the open connections, so the database driver doesn't throw an exception. The issue is that keeping track of open connections has been implemented manually (this should be refactored - writing a wrapper or using AutoResetEvent but there are some other things that need to be taken care of first). So I need to keep track of the open connections and especially I have to handle the case of an exception:

Parallel.ForEach(hugeLists, parallelOptions, currentList => {
  WaitForDatabaseConnection();
  try {
     Interlocked.Increment(ref numOfOpenConnections);  
     DoDatabaseCallAndInsertions();
  } catch (Exception ex) {
     // logging
     throw;
  } finally {
     Interlocked.Decrement(ref numOfOpenConnections);
  } 
}

This is the simplified version of the loop without cancellation. To improve the performance in case of an Exception the loop should be cancelled as soon as possible when an Exception is thrown. If one thing fails the loop should stop.

How can I achieve that making sure that numOfOpenConnections is being updated correctly?

What I have tried so far (is this behaving like I want it to or am I missing something?):

Parallel.ForEach(hugeLists, parallelOptions, (currentList, parallelLoopState) => {
  parallelOptions.CancellationToken.ThrowIfCancellationRequested();
  WaitForDatabaseConnection();
  try {     
     Interlocked.Increment(ref numOfOpenConnections);  
     DoDatabaseCallAndInsertions();
  } catch (Exception ex) {
     // logging
     cancellationTokenSource.Cancel();  
     parallelLoopState.Stop();
     throw; // still want to preserve the original exception information
  } finally {
     Interlocked.Decrement(ref numOfOpenConnections);
  } 
}

I could wrap this code in a try - catch construct and catch AggregateException.

Matthias Herrmann
  • 2,650
  • 5
  • 32
  • 66
  • 1
    Why are you calling `parallelLoopState.Stop` and use a `CancellationTokenSource`? Does it make the termination of the parallel loop any faster than just letting the method terminate by itself? AFAIK an unhandled exception in the lambda will make the parallel loop to terminate as soon as all currently running lambdas have completed. – Theodor Zoulias Jun 09 '20 at 12:01
  • 1
    @TheodorZoulias you're right the `WaitForDatabaseConnection` is placed outside the for loop - I oversaw this when simplifying it for stackoverflow. – Matthias Herrmann Jun 09 '20 at 12:06
  • @TheodorZoulias I thought with the `parallelLoopState.Stop` it will still be possible that new Tasks enter - but actually this can't happen - therefore I thought I need a Cancellation token... Do I always get an exception of type `AggregateException` if an exception is thrown in the loop body? – Matthias Herrmann Jun 09 '20 at 12:10
  • 1
    Yeap, according to the [documentation](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.parallel.for) any exception inside the lambda will result to a final `AggregateException`. There is a possibility for other exceptions to be thrown (`OperationCanceledException`, `ArgumentNullException` and `ObjectDisposedException`), but these are unrelated to what happens inside the lambda. – Theodor Zoulias Jun 09 '20 at 12:15
  • Ty, the other exceptions should not occur because I don't need cancellation tokens – Matthias Herrmann Jun 09 '20 at 12:21
  • You could consider passing the `parallelLoopState` as a parameter of the `DoDatabaseCallAndInsertions` method, and inspect there frequently the property [`ShouldExitCurrentIteration`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.parallelloopstate.shouldexitcurrentiteration), and exit fast in case it has become `true`. This should increase the responsiveness of the parallel loop in case of an exception. – Theodor Zoulias Jun 09 '20 at 12:26
  • @TheodorZoulias I'll consider that, do you think it is feasible to do this with a timer? E.g. if there is an expensive database query I need to stop the query and therefore I would need some asynchronous timer to check for `DoDatabaseCallAndInsertions` – Matthias Herrmann Jun 09 '20 at 12:34
  • Is this expensive database query cancelable? If not, are you OK with exiting the parallel loop early (in case of an exception) while some queries are still running? – Theodor Zoulias Jun 09 '20 at 12:38
  • Yes the database query inserts into temporary tables and uses transactions - so it is safe to abort the query at any given time - the database system just aborts the transaction and uses rollbacks – Matthias Herrmann Jun 09 '20 at 12:40
  • Aborting the transaction and rolling back may take some time. Do you want to wait for the abort-rollback to complete, or just exit the parallel loop as fast as possible, and leave the database do its job unattended? – Theodor Zoulias Jun 09 '20 at 12:43
  • The second one - just exit as fast as possible and let the database do the clean up – Matthias Herrmann Jun 09 '20 at 12:44
  • OK. On more question: how are you communicating with the database the request for a transaction rollback? Is it enough for example to pass a `CancellationToken` to the data access layer `Execute` method? – Theodor Zoulias Jun 09 '20 at 12:48
  • Sadly there is no real DataAccessLayer - it are direct calls like `executeGeneric` on oracle database without abstraction (ODP https://www.oracle.com/webfolder/technetwork/tutorials/obe/db/dotnet/GettingStartedNETVersion/GettingStartedNETVersion.htm) – Matthias Herrmann Jun 09 '20 at 12:52

1 Answers1

1

You could call the DoDatabaseCallAndInsertions method in a way that waits for its completion only while the state of the loop is not exceptional, and otherwise forgets about it and allows the parallel loop to complete immediately. Using a cancelable wrapper is probably the simplest way to achieve this. Here is a method RunAsCancelable that waits for a function to complete, or a CancellationToken to become canceled, whatever comes first:

public static TResult RunAsCancelable<TResult>(Func<TResult> function,
    CancellationToken token)
{
    token.ThrowIfCancellationRequested();
    Task<TResult> task = Task.Run(function, token);
    try
    {
        // Wait for the function to complete, or the token to become canceled
        task.Wait(token);
    }
    catch { } // Prevent an AggregateException to be thrown

    token.ThrowIfCancellationRequested();
    // Propagate the result, or the original exception unwrapped
    return task.GetAwaiter().GetResult();
}

public static void RunAsCancelable(Action action, CancellationToken token)
    => RunAsCancelable<object>(() => { action(); return null; }, token);

The RunAsCancelable method throws an OperationCanceledException in case the token was canceled before the completion of the action, or propagates the exception occurred in the action, or completes successfully if the action completed successfully.

Usage example:

using (var failureCTS = new CancellationTokenSource()) // Communicates failure
{
    Parallel.ForEach(hugeLists, parallelOptions, (currentList, parallelLoopState) =>
    {
        WaitForDatabaseConnection();
        try
        {
            Interlocked.Increment(ref numOfOpenConnections);
            RunAsCancelable(() => DoDatabaseCallAndInsertions(failureCTS.Token),
                failureCTS.Token);
        }
        catch (OperationCanceledException ex)
            when (ex.CancellationToken == failureCTS.Token)
        {
            // Do nothing (an exception occurred in another thread)
        }
        catch (Exception ex)
        {
            Log.Error(ex);
            failureCTS.Cancel(); // Signal failure to the other threads
            throw; // Inform the parallel loop that an error has occurred
        }
        finally
        {
            Interlocked.Decrement(ref numOfOpenConnections);
        }
    });
}

The DoDatabaseCallAndInsertions method can inspect the property IsCancellationRequested of the CancellationToken parameter at various points, and perform a transaction rollback if needed.

It should be noted that the RunAsCancelable method is quite wasteful regarding the usage of ThreadPool threads. One extra thread must be blocked in order to make each supplied action cancelable, so two threads are needed for each execution of the lambda. To prevent a possible starvation of the ThreadPool, it is probably a good idea to increase the minimum number of threads that the thread pool creates on demand before switching to the create-one-every-500-msec algorithm, by using the ThreadPool.SetMinThreads method at the startup of the application.

ThreadPool.SetMinThreads(100, 10);

Important: The above solution makes no attempt to log the possible exceptions of the operations that have been forgotten. Only the exception of the first failed operation will be logged.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Ty - I will think about it. For the time being I'll just use `Stop` because I don't want to complicate the performance to much and I'll have to decide whether this additional performance gain will be worth the cost to complicate the code base - maybe after some more refactoring work – Matthias Herrmann Jun 10 '20 at 06:11
  • @MatthiasHerrmann yeap, it's a bit complicated indeed. In case you make the effort eventually to refactor the code, I would suggest taking a look at the [TPL Dataflow](https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library) library. It is a much more powerful alternative to the `Parallel` class. [Unlike](https://stackoverflow.com/questions/15136542/parallel-foreach-with-asynchronous-lambda) the `Parallel.ForEach` it supports asynchronous lambdas, so the cancelable wrapper could be made asynchronous (without wasting threads). – Theodor Zoulias Jun 10 '20 at 06:48
  • 1
    @MatthiasHerrmann the TPL Dataflow has some learning curve though. It may need 1-2 days of study before being able to be productive with it. Its selling point is its ability to perform task-parallelism on top of data-parallelism. In other words you could configure different parts of the work to be processed independently from the others parts, with a different degree of parallelism for each part. This is done by forming a "pipeline" consisting of linked processing "blocks", with the data flowing from one block to the next. – Theodor Zoulias Jun 10 '20 at 06:59