14

On the subject of waiting until tasks are complete and thread synchronisation.

I currently have an iteration i have enclosed within a Parallel.ForEach. In the Example below I have posed some questions in the comments about how best to handle the graceful termination of the loop (.NET 4.0);

private void myFunction()
    {

        IList<string> iListOfItems = new List<string>();
        // populate iListOfItems

        CancellationTokenSource cts = new CancellationTokenSource();

        ParallelOptions po = new ParallelOptions();
        po.MaxDegreeOfParallelism = 20; // max threads
        po.CancellationToken = cts.Token;

        try
        {
            var myWcfProxy = new myWcfClientSoapClient();

            if (Parallel.ForEach(iListOfItems, po, (item, loopsate) =>
            {
                try
                {
                    if (_requestedToStop)
                        loopsate.Stop();
                    // long running blocking WS call, check before and after
                    var response = myWcfProxy.ProcessIntervalConfiguration(item);
                    if (_requestedToStop)
                        loopsate.Stop();

                    // perform some local processing of the response object
                }
                catch (Exception ex)
                {
                    // cannot continue game over.
                    if (myWcfProxy.State == CommunicationState.Faulted)
                    {
                        loopsate.Stop();
                        throw;
                    }
                }

                // else carry on..
                // raise some events and other actions that could all risk an unhanded error.

            }
            ).IsCompleted)
            {
                RaiseAllItemsCompleteEvent();
            }
        }
        catch (Exception ex)
        {
            // if an unhandled error is raised within one of the Parallel.ForEach threads, do all threads in the
            // ForEach abort? or run to completion? Is loopsate.Stop (or equivalent) called as soon as the framework raises an Exception?
            // Do I need to call cts.Cancel here?

            // I want to wait for all the threads to terminate before I continue at this point. Howe do we achieve that?

            // do i need to call cts.Dispose() ?

            MessageBox.Show(Logging.FormatException(ex));
        }
        finally
        {

            if (myWcfProxy != null)
            {
            // possible race condition with the for-each threads here unless we wait for them to terminate.
                if (myWcfProxy.State == System.ServiceModel.CommunicationState.Faulted)
                    myWcfProxy.Abort();

                myWcfProxy.Close();
            }

            // possible race condition with the for-each threads here unless we wait for them to terminate.
            _requestedToStop = false;

        }

    }

Any help would be most appreciated. The MSDN documentation talks of ManualResetEventSlim's and cancellationToken.WaitHandle's. but not sure how to wire them into this, seem to be struggling understanding the MSDN examples as most dont apply.

Microsoft Developer
  • 1,919
  • 1
  • 20
  • 27

1 Answers1

8

I have mocked up some code below that may answer your question. The basic point is that you get fork/join parallelism with Parallel.ForEach, so you don't need to worry about race conditions outside of the parallel task (the calling thread blocks until the tasks have completed, successfully or otherwise). You just want to make sure to use the LoopState variable (the second argument to the lambda) to control your loop state.

If any iteration of the loop threw an unhandled exception, the overall loop will raise the AggregateException caught at the end.

Other links that mention this topic:

Parallel.ForEach throws exception when processing extremely large sets of data

http://msdn.microsoft.com/en-us/library/dd460720.aspx

Does Parallel.ForEach limits the number of active threads?

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.ServiceModel;

namespace Temp
{
    public class Class1
    {
        private class MockWcfProxy
        {
            internal object ProcessIntervalConfiguration(string item)
            {
                return new Object();
            }

            public CommunicationState State { get; set; }
        }

        private void myFunction()
        {

            IList<string> iListOfItems = new List<string>();
            // populate iListOfItems

            CancellationTokenSource cts = new CancellationTokenSource();

            ParallelOptions po = new ParallelOptions();
            po.MaxDegreeOfParallelism = 20; // max threads
            po.CancellationToken = cts.Token;

            try
            {
                var myWcfProxy = new MockWcfProxy();

                if (Parallel.ForEach(iListOfItems, po, (item, loopState) =>
                    {
                        try
                        {
                            if (loopState.ShouldExitCurrentIteration || loopState.IsExceptional)
                                loopState.Stop();

                            // long running blocking WS call, check before and after
                            var response = myWcfProxy.ProcessIntervalConfiguration(item);

                            if (loopState.ShouldExitCurrentIteration || loopState.IsExceptional)
                                loopState.Stop();

                            // perform some local processing of the response object
                        }
                        catch (Exception ex)
                        {
                            // cannot continue game over.
                            if (myWcfProxy.State == CommunicationState.Faulted)
                            {
                                loopState.Stop();
                                throw;
                            }

                            // FYI you are swallowing all other exceptions here...
                        }

                        // else carry on..
                        // raise some events and other actions that could all risk an unhanded error.
                    }
                ).IsCompleted)
                {
                    RaiseAllItemsCompleteEvent();
                }
            }
            catch (AggregateException aggEx)
            {
                // This section will be entered if any of the loops threw an unhandled exception.  
                // Because we re-threw the WCF exeption above, you can use aggEx.InnerExceptions here 
                // to see those (if you want).
            }
            // Execution will not get to this point until all of the iterations have completed (or one 
            // has failed, and all that were running when that failure occurred complete).
        }

        private void RaiseAllItemsCompleteEvent()
        {
            // Everything completed...
        }
    }
}
Community
  • 1
  • 1
Chris Shain
  • 50,833
  • 6
  • 93
  • 125
  • Thanks for the insight. I should say that at the point where you rightly point out "swallowing all other exceptions here" I am making a logging call that will log the web service or client side WCF exception. The intention is for the loop to continue if the exception does not result in an invalidated WCF proxy. I anticipate time-out errors or server side fault exceptions. non of which for this particular functionality, need any mitigating functionality in the catch. however, we would be reviewing the log file and any such exception would be investigated. – Microsoft Developer Jan 13 '11 at 14:12
  • What confused me about the Parallel.ForEach was that i too assumed that it should be a blocking call until all threads in the pool complete (exceptions cached or not), however the number of threads reported as running at a breakpoint set within for example your catch (AggregateException aggEx) block, would report as 20 threads in VS 2010 thread viewer. So i got sysinternals out and looked at the vshost executable being debugged and it also showed 22 threads including the UI and message pump. – Microsoft Developer Jan 13 '11 at 14:16
  • Further, the events that were raised within the loop, were throwing further exceptions after the function has executed and the finally block has ran. – Microsoft Developer Jan 13 '11 at 14:19
  • Regarding the number of threads running after the loop exits: the threads used for parallel are thread pool threads, and so should remain running after the loop exits. – Chris Shain Jan 13 '11 at 14:43
  • I see, but not running my code. Just suspended? They do eventually terminate after about 5 to 20 seconds however. Just doing some testing by the way. – Microsoft Developer Jan 13 '11 at 15:02
  • 1
    Yes, they should be suspended. The thread pool will eventually terminate them if you don't queue any more work, which is exactly what you are seeing. – Chris Shain Jan 13 '11 at 15:06
  • Thanks for the advice. Looks like its running as expected. – Microsoft Developer Jan 13 '11 at 15:38
  • ShouldExitCurrentIteration is always true when IsExceptional is true (see http://www.blackwasp.co.uk/CancelParallelLoopIteration_2.aspx). Also, if cancellation token source is canceled, OperationCanceledException thrown and the parallel loop body is not executed. – Der_Meister Mar 09 '17 at 14:33