12

I have something similar to this in my code:

Parallel.ForEach(myList, new ParallelOptions { MaxDegreeOfParallelism = 4 }, item =>
{
    Process(item);
});

The thing is that I do a bunch of things inside Process() method (connect to a file share, parse a file, save to db, etc) and I worry that something might go wrong during this process making the iteration never finish...could this ever happen?

Is there a way to set a timeout for the Process() method to avoid ending up having zombie threads?

UPDATE:

The easiest way I've found for setting a timeout is by adding milliseconds to a CancellationTokenSource or calling the Wait() method on a task.

Option #1

Parallel.ForEach(myList, new ParallelOptions { MaxDegreeOfParallelism = 4 }, item =>
{
    var cts = new CancellationTokenSource(2000);
    Task task = Task.Factory.StartNew(() => Process(item), cts.Token);
});

Option #2

Parallel.ForEach(myList, new ParallelOptions { MaxDegreeOfParallelism = 4 }, item =>
{
    Task task = Task.Factory.StartNew(() => Process(item));
    task.Wait(2000);
});

The problem is that none of those options are able to cancel the Process() method. Do I need to check for something in the Process() method?

Santiago Aceñolaza
  • 276
  • 1
  • 2
  • 11
  • 1
    "could this ever happen?" Absolutely... If you have a process that doesn't ever finish (waits for IO that never happens, infinite loop, etc...), then the loop will never complete. – poy Mar 26 '14 at 15:01
  • So I'll need to find a way to avoid having never ending iterations ... maybe by surrounding the Process method call with a Threading.Timer? – Santiago Aceñolaza Mar 26 '14 at 15:24
  • Are you stuck with .NET 4, or can you move to .NET 4.5? If you can, then you have better options. – poy Mar 26 '14 at 18:18
  • I'm using .NET 4.5 ... any ideas are appreciated! – Santiago Aceñolaza Mar 26 '14 at 19:22
  • 1
    Just wondering why you are creating a Task inside Parallel.ForEach? I thought Parallel.ForEach does that already for you... – usefulBee Oct 15 '14 at 19:41
  • @usefulBee just to be able to pass a CancellationToken for canceling the loop from the inner fuction – Santiago Aceñolaza Oct 15 '14 at 22:26
  • There are `Task.WaitAll` overloads with timeout and cancellation token parameters: https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.task.waitall#overloads – Trass3r Jan 12 '21 at 00:23

3 Answers3

3

I ended up combining both options. It works but I don't know if this is the proper way to do this.

Solution:

        Parallel.ForEach(myList, new ParallelOptions { MaxDegreeOfParallelism = 4 }, item =>
        {
                var tokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
                var token = tokenSource.Token;

                Task task = Task.Factory.StartNew(() => Process(item, token), token);
                task.Wait();
        });

and in Process() I check for cancellation multiple times:

    private void Process(MyItem item, CancellationToken token)
    {
        try
        {
            if (token.IsCancellationRequested)
                token.ThrowIfCancellationRequested();

            ...sentences

            if (token.IsCancellationRequested)
                token.ThrowIfCancellationRequested();

            ...more sentences

            if (token.IsCancellationRequested)
                token.ThrowIfCancellationRequested();

            ...etc
        }
        catch(Exception ex)
            Console.WriteLine("Operation cancelled");
Santiago Aceñolaza
  • 276
  • 1
  • 2
  • 11
  • 2
    There are many things I don't like about this implementation but forcing each iteration of the ForEach method to run in a separate Task is what I found ugliest. – Santiago Aceñolaza Mar 27 '14 at 15:10
2

Consider adding CancellationToken to your code. This way, at any point you can properly cancel all the operations.

Then, you can use the CancelAfter() method.

poy
  • 10,063
  • 9
  • 49
  • 74
1

I ended up with a slightly different implementation. In my case, checking for the CancellationToken within Process() wouldn't work due to potentially long running statements in between checks. For example if my timeout was 5 seconds, and a single statement took say 100 seconds ... I wouldn't know until that statement completed and then was detected by if (token.IsCancellationRequested).

Here's what I ended up doing

     Parallel.ForEach(myList, (item) =>
     {
         Task task = Task.Factory.StartNew(() =>
         {
             Process(item));
         });

         if(task.Wait(10000)) // Specify overall timeout for Process() here
             Console.WriteLine("Didn't Time out. All's good!"); 
         else
             Console.WriteLine("Timed out. Leave this one and continue with the rest"); 
     });

Then within the Process() method, I added further checks on potentially long running statements to allow it to gracefully handle timeouts (as much as possible). So it's only in the worst case that Process() had to be prematurely stopped by Task.Wait() above.

    private void Process(MyItem item)
    {
      ...
        cmd.CommandTimeout = 5; // The total of timeouts within Process() were 
                                // set to be less than the total Task.Wait duration.
                                // Unfortunately some potentially long running methods 
                                // don't have a built in timeout.
      ...
    }
perNalin
  • 161
  • 9