3

I'm using Callable and ExecutorService to use multi-threading in my application.

In case if any Exceptions are thrown from one thread need to stop all threads even though it's work completed and need to throw that exception to the calling class method. For this, I used a shutDownNow() method.

Is this the right way? or any other effective ways there?

        
ExecutorService exSvc = Executors.newFixedThreadPool(5);
exSvc.setKeepAliveTime(60, TimeUnit.SECONDS);

List<Future<Integer>> futureList = new LinkedList();
for(int i=0; i<50;i++){
    futureList.add(exSvc.submit(
        new Callable<Integer>() {
            public Integer call()  throws Exception{
                int num =  new Random().nextInt(1000);
                if(num==500){
                    throw new Exception("Error");
                }
            return num;
            }
        }));
}

for(int i=0; i<50; i++){
 
   try {
    int value =  futureList.get(i).get();
   } catch (Exception e) {
        exSvc.shutdownNow();
        throw new Exception("Error");
   } 
}


DAK
  • 282
  • 1
  • 18

2 Answers2

2

As ciamei said, if you want to detect the exception as early as possible, maybe you can use a CountDownLatch:

        CountDownLatch cdl = new CountDownLatch(1);
        AtomicInteger successCount = new AtomicInteger();
        AtomicBoolean fail = new AtomicBoolean();
        int taskSize = 50;
        for(int i=0; i<taskSize;i++){
            futureList.add(exSvc.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    int num =  new Random().nextInt(1000);
                    if(num == 500){
                        fail.set(true);
                        cdl.countDown();
                        throw new Exception("Error");
                    }
                    if (successCount.incrementAndGet() == taskSize) {
                        cdl.countDown();
                    }
                    return num;
                }
            }));
        }
        try {
            cdl.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        if (fail.get()) {
            exSvc.shutdownNow();
            throw new Exception("Error");
        }
        //...
zysaaa
  • 1,777
  • 2
  • 8
  • 19
  • Thanks for a new approach, When I need to call future.get()? Is it before cdl.await() or after that? – DAK Jan 04 '22 at 12:48
  • After ```cdl.await();```, if ```fail.get()``` is false, it means that all tasks are executed successfully, then call ```future#get``` to get all results. – zysaaa Jan 04 '22 at 12:56
  • This will work fine, but introduces some synchronization penalty, because all the tasks have to atomically increment the same counter now. If the tasks are rather long, the penalty is, relatively, negligible. On the other hand, if the tasks are very short, this may add some noticeable overhead. – ciamej Jan 04 '22 at 13:15
1

This should work in your example. However, be sure to properly handle thread interruptions in your callables.

Note that the other tasks will be cancelled on a best effort basis and only after you have noticed that one of the tasks have thrown an exception. E.g., if task no. 5 have thrown an exception, you have to wait for the completion of tasks 0-4, and only then you notice that task 5 has failed.

Alternatively, you could subclass the ThreadPoolExecutor like here, to detect the exception as early as possible.

ciamej
  • 6,918
  • 2
  • 29
  • 39
  • [As per this](https://stackoverflow.com/a/2248203/821497) when I need to call afterExecute method. whether I need to use that class for an ExecutorService itself or only for getting future. – DAK Jan 04 '22 at 10:52
  • 1
    @DAK No, in the alternative approach, you do not create a newFixedThreadPool, but your own implementation of the ThreadExecutor. The `afterExecute` method will be called automatically by some other methods from the Executor. You just reimplement that bit. – ciamej Jan 04 '22 at 10:57
  • Thanks will try this. afterExecute method accepts Runnable object but I am using a callable is there anything I need to change? – DAK Jan 04 '22 at 11:08
  • @DAK TBH I haven't tried it and the documentation is a little bit fuzzy about it. On one hand you have submit(Runnable) and submit(Callable) but afterExecute as you said only accepts Runnable. I would guess that internally Callable is put inside some object that implements both Runnable and Future, and the code should work as is. – ciamej Jan 04 '22 at 11:26
  • @DAK If you try it, and it works, please write back, so that we get a confirmation that it is the right way to go. – ciamej Jan 04 '22 at 11:28
  • sure i will try and update – DAK Jan 04 '22 at 11:32