15

In one of my application I'm using the ExecutorService class to create a fixed thread pool and CountDownLatch to wait for the threads to complete. This is working fine if the process didn't throw any exception . If there is an exception occurred in any of the threads, I need to stop all the running thread and report the error to the main thread. Can any one please help me to solve this?

This is the sample code I'm using for executing multiple threads.

    private void executeThreads()
    {
        int noOfThreads = 10;
        ExecutorService executor = Executors.newFixedThreadPool(noOfThreads);     
        try      
       {
        CountDownLatch latch = new CountDownLatch(noOfThreads);
        for(int i=0; i< noOfThreads; i++){
         executor.submit(new ThreadExecutor(latch));
        }
        latch.await();           
       }
       catch(Exception e)
       {
        e.printStackTrace();
       }
       finally
       {
        executor.shutDown();
       }
   }

This is the Executor Class

     public class ThreadExecutor implements Callable<String> {
        CountDownLatch latch ;
        public ThreadExecutor(CountDownLatch latch){
            this.latch = latch;
        }   

    @Override
    public String call() throws Exception
    {
        doMyTask(); // process logic goes here!
        this.latch.countDown();
        return "Success";
    }

=============================================================================

Thank you all :)

I have corrected my class as given below and that is working now.

private void executeThreads()
    {
        int noOfThreads = 10;
        ExecutorService executor = Executors.newFixedThreadPool(noOfThreads);     
       ArrayList<Future<Object>> futureList = new ArrayList<Future<Object>>(noOfThreads );
    try
    {
        userContext = BSF.getMyContext();
        CountDownLatch latch = new CountDownLatch(noOfComponentsToImport);

        for(ImportContent artifact:artifactList){
            futureList.add(executor.submit(new ThreadExecutor(latch)));
        }

        latch.await();

        for(Future<Object> future : futureList)
        {
                  try
                  {
                      future.get();                 
                   }
                   catch(ExecutionException e)
                   {   //handle it               
                    }
        }           

    }
    catch (Exception e) {
       //handle it
    }
    finally
    {
        executor.shutdown();      

        try
        {
            executor.awaitTermination(90000, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e)
        {
           //handle it
        }
    }
   }

Executor Class :

public class ThreadExecutor implements Callable<String> {
        private static volatile boolean isAnyError;
        CountDownLatch latch ;
        public ThreadExecutor(CountDownLatch latch){
            this.latch = latch;
        }   

    @Override
    public String call() throws Exception
    {

      try{
            if(!isAnyError)
            { 
               doMyTask(); // process logic goes here!
            }
     }
     catch(Exception e)
     {
        isAnyError = true ;
        throw e;
      }
      finally
      {
        this.latch.countDown();
       }
        return "Success";
    }
Achu S
  • 171
  • 1
  • 1
  • 4

4 Answers4

5

Use an ExecutorCompletionService, complete with an ExecutorService that outlives the duration of the tasks (i.e. it doesn't get shut down afterwards):

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

class Threader {

    static ExecutorService service = Executors.newCachedThreadPool();

    public static void main(String[] args) {
        new Threader().start();
        service.shutdown();
    }

    private void start() {
        CompletionService<Void> completionService = new ExecutorCompletionService<Void>(
                service);
        /*
         * Holds all the futures for the submitted tasks
         */
        List<Future<Void>> results = new ArrayList<Future<Void>>();

        for (int i = 0; i < 3; i++) {
            final int callableNumber = i;

            results.add(completionService.submit(new Callable<Void>() {
                 @Override
                 public Void call() throws Exception {
                     System.out.println("Task " + callableNumber
                             + " in progress");
                     try {
                         Thread.sleep(callableNumber * 1000);
                     } catch (InterruptedException ex) {
                         System.out.println("Task " + callableNumber
                                 + " cancelled");
                         return null;
                     }
                     if (callableNumber == 1) {
                         throw new Exception("Wrong answer for task "
                                 + callableNumber);
                     }
                     System.out.println("Task " + callableNumber + " complete");
                     return null;
                 }
             }));
        }

        boolean complete = false;
        while (!complete) {
            complete = true;
            Iterator<Future<Void>> futuresIt = results.iterator();
            while (futuresIt.hasNext()) {
                if (futuresIt.next().isDone()) {
                    futuresIt.remove();
                } else {
                    complete = false;
                }
            }

            if (!results.isEmpty()) {
                try {
                    /*
                     * Examine results of next completed task
                     */
                    completionService.take().get();
                } catch (InterruptedException e) {
                    /*
                     * Give up - interrupted.
                     */
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                } catch (ExecutionException e) {
                    /*
                     * The task threw an exception
                     */
                    System.out.println("Execution exception " + e.getMessage());
                    complete = true;
                    for (Future<Void> future : results) {
                        if (!future.isDone()) {
                            System.out.println("Cancelling " + future);
                            future.cancel(true);
                        }
                    }
                }
            }
        }

    }
}

Output is something like:

Task 0 in progress
Task 2 in progress
Task 1 in progress
Task 0 complete
Execution exception java.lang.Exception: Wrong answer for task 1
Cancelling java.util.concurrent.FutureTask@a59698
Task 2 cancelled

where Task 2 got cancelled due to the failure of Task 1.

Saurav Sahu
  • 13,038
  • 6
  • 64
  • 79
artbristol
  • 32,010
  • 5
  • 70
  • 103
  • thanks. you need to add executor.shutdownNow(); executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); to shut it down completely – Dejell Mar 10 '15 at 16:44
  • what does this loop do? while (futuresIt.hasNext()) { if (futuresIt.next().isDone()) { futuresIt.remove(); } else { complete = false; } } – Dejell Mar 10 '15 at 20:14
  • @Dejel it's the standard way of removing items from a collection while iterating through it – artbristol Mar 11 '15 at 09:30
  • @Dejel the code deliberately does not use executor.shutdownNow() which I regard as an antipattern – artbristol Mar 11 '15 at 09:31
  • thanks - but why do you need to remove the futures if they are done? what would you use instead of executor.shutdownNow() ? – Dejell Mar 11 '15 at 10:36
  • You remove the futures so that the next time through the main loop, the list of futures to check only includes incomplete ones. You do not need shutdownNow, you just leave the thread pool running, for future invocations. shutdown/shutdownNow should only be used on application termination, not within normal code. – artbristol Mar 11 '15 at 11:05
  • Regarding removing the futures - isn't it enough to do completionService.take().get(); ? what if I don't have any future invocation in the code for this method? by me for every method that I need concurrency I create a new ExecutorService executor = Executors.newFixedThreadPool(18); - shall I use one for all of my web app? – Dejell Mar 11 '15 at 11:15
  • 1
    `completionService.take().get()` blocks, which is no good. Yes, you should use a single thread pool for all your web app. – artbristol Mar 11 '15 at 13:21
  • thanks. I will find examples how to use a single thread pool for web – Dejell Mar 11 '15 at 13:38
  • 1
    I think that there is a problem with the code above. if all are executed with no exception completionService.take(). will wait forerver – Dejell Mar 11 '15 at 15:05
  • @Dejel thanks for the feedback! I've updated the code to avoid that problem, I hope – artbristol Mar 12 '15 at 09:39
  • in my code I used AtomicInteger instead to solve it. but thanks! – Dejell Mar 12 '15 at 20:17
  • I was using the code, and realized that something doesn't work as expected. as completionService.take().get(); will take only the next future, but what about all the others that are done? I think that there should be an internal loop for all those that are done – Dejell Jun 17 '15 at 08:38
  • @Dejel the `while (!complete) {` look is meant to do what you're asking, but maybe there's a bug – artbristol Jun 17 '15 at 14:00
  • yes I guess that it's a bug in the code. as what if all tasks finished very quickly? so completionService.take().get(); will return only the first one. I debugged the code – Dejell Jun 17 '15 at 15:43
4

I strongly suggest you use a robust mechanism to count down the latch. Use an all-encompassing try-finally { latch.countDown(); } Detect errors in threads using a separate mechanism.

Marko Topolnik
  • 195,646
  • 29
  • 319
  • 436
  • +1 - yes, I always start with any thread code with such a structure so that the threads cannot escape without notifying 'something'. – Martin James Aug 16 '12 at 09:22
1

I think that you will need one more thread, call it a "Watcher", which will check the value of an AtomicBoolean for true. Once it's set - you will shutdown the main execution service. Keep in mind that shutdown mechanism won't guarantee an immediate stop of all threads. Read this, for example: Graceful shutdown of threads and executor

Community
  • 1
  • 1
yegor256
  • 102,010
  • 123
  • 446
  • 597
1

I think you need to restructure your code. Take a look into ExecutorService#invokeAny

Executes the given tasks, returning the result of one that has completed successfully (i.e., without throwing an exception), if any do. Upon normal or exceptional return, tasks that have not completed are cancelled. The results of this method are undefined if the given collection is modified while this operation is in progress.

This seems to be the behavior you need. And you don't need a CountDownLatch as main will block in the invokeAny

Cratylus
  • 52,998
  • 69
  • 209
  • 339
  • it seems that it will return the first one that succeeded. What if I want all that succeeded? – Dejell Mar 10 '15 at 12:48