241

What is the simplest way to to wait for all tasks of ExecutorService to finish? My task is primarily computational, so I just want to run a large number of jobs - one on each core. Right now my setup looks like this:

ExecutorService es = Executors.newFixedThreadPool(2);
for (DataTable singleTable : uniquePhrases) {   
    es.execute(new ComputeDTask(singleTable));
}
try{
    es.wait();
} 
catch (InterruptedException e){
    e.printStackTrace();
}

ComputeDTask implements runnable. This appears to execute the tasks correctly, but the code crashes on wait() with IllegalMonitorStateException. This is odd, because I played around with some toy examples and it appeared to work.

uniquePhrases contains several tens of thousands of elements. Should I be using another method? I am looking for something as simple as possible

gstackoverflow
  • 36,709
  • 117
  • 359
  • 710
george smiley
  • 2,721
  • 4
  • 21
  • 13

16 Answers16

258

The simplest approach is to use ExecutorService.invokeAll() which does what you want in a one-liner. In your parlance, you'll need to modify or wrap ComputeDTask to implement Callable<>, which can give you quite a bit more flexibility. Probably in your app there is a meaningful implementation of Callable.call(), but here's a way to wrap it if not using Executors.callable().

ExecutorService es = Executors.newFixedThreadPool(2);
List<Callable<Object>> todo = new ArrayList<Callable<Object>>(singleTable.size());

for (DataTable singleTable: uniquePhrases) { 
    todo.add(Executors.callable(new ComputeDTask(singleTable))); 
}

List<Future<Object>> answers = es.invokeAll(todo);

As others have pointed out, you could use the timeout version of invokeAll() if appropriate. In this example, answers is going to contain a bunch of Futures which will return nulls (see definition of Executors.callable(). Probably what you want to do is a slight refactoring so you can get a useful answer back, or a reference to the underlying ComputeDTask, but I can't tell from your example.

If it isn't clear, note that invokeAll() will not return until all the tasks are completed. (i.e., all the Futures in your answers collection will report .isDone() if asked.) This avoids all the manual shutdown, awaitTermination, etc... and allows you to reuse this ExecutorService neatly for multiple cycles, if desired.

There are a few related questions on SO:

None of these are strictly on-point for your question, but they do provide a bit of color about how folks think Executor/ExecutorService ought to be used.

Dave Jarvis
  • 30,436
  • 41
  • 178
  • 315
andersoj
  • 22,406
  • 7
  • 62
  • 73
  • 10
    This is perfect if you're adding all of your jobs in a batch and you hang onto the list of Callables, but it won't work if you're calling ExecutorService.submit() in a callback or event-loop situation. – Desty Sep 20 '13 at 10:26
  • 4
    I think it's worth mentioning that shutdown() still should be called when the ExecutorService is no longer needed, otherwise the threads will never terminate (except for the cases when corePoolSize=0 or allowCoreThreadTimeOut=true). – John29 Apr 26 '16 at 14:19
  • 1
    amazing! Just what I was looking for. Thanks a lot for sharing the answer. Let me try this out. – MohamedSanaulla Apr 12 '17 at 16:53
  • 1
    @Desty In such situation, what will be the best way to implement? – TommyQu Jun 10 '21 at 02:44
66

If you want to wait for all tasks to complete, use the shutdown method instead of wait. Then follow it with awaitTermination.

Also, you can use Runtime.availableProcessors to get the number of hardware threads so you can initialize your threadpool properly.

Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
NG.
  • 22,560
  • 5
  • 55
  • 61
  • 36
    shutdown() stops the ExecutorService from accepting new tasks and closes down idle worker threads. It is not specified to wait for the shutdown to complete and the implementation in ThreadPoolExecutor does not wait. – Alain O'Dea Jul 17 '10 at 00:41
  • 1
    @Alain - thanks. I should have mentioned awaitTermination. Fixed. – NG. Jul 17 '10 at 01:20
  • 5
    What if in order for a task to finish it must schedule further tasks? For example, you could make a multithreaded tree traversal which hands off branches to worker threads. In that case, since the ExecutorService is shut down instantly it fails to accept any recursively scheduled jobs. – Rag Apr 04 '12 at 02:58
  • That becomes an interesting problem. One solution is you could utilize a separate executor service for child tasks only, and not expose it to users. That might be inefficient though with the additional threads. It sounds like you might need some more advanced life time management of the executor to determine when it is safe to call shutdown. – NG. Apr 05 '12 at 13:44
  • unfortunately after using shutdown, you can never schedule any more tasks, so basically have to new up your ExecutorService again, but it may be worth the trade off...if you have a stopping point where you know you won't be adding any new future threads, that is... – rogerdpack Feb 07 '13 at 23:00
  • 2
    `awaitTermination` requires timeout time as a parameter. While it's possible to provide a finite time and place a loop around it to wait until all threads have finished, I was wondering if there was a more elegant solution. – Abs Mar 28 '13 at 12:02
  • 2
    You are right, but see this answer - http://stackoverflow.com/a/1250655/263895 - you could always give it an incredibly long timeout – NG. Mar 28 '13 at 14:00
  • 1
    @NG., at first sight it seems that the `awaitTermination` API is incomplete because of not providing a way to allow to wait indefinitely. – Jaime Hablutzel Nov 26 '18 at 20:03
52

If waiting for all tasks in the ExecutorService to finish isn't precisely your goal, but rather waiting until a specific batch of tasks has completed, you can use a CompletionService — specifically, an ExecutorCompletionService.

The idea is to create an ExecutorCompletionService wrapping your Executor, submit some known number of tasks through the CompletionService, then draw that same number of results from the completion queue using either take() (which blocks) or poll() (which does not). Once you've drawn all the expected results corresponding to the tasks you submitted, you know they're all done.

Let me state this one more time, because it's not obvious from the interface: You must know how many things you put into the CompletionService in order to know how many things to try to draw out. This matters especially with the take() method: call it one time too many and it will block your calling thread until some other thread submits another job to the same CompletionService.

There are some examples showing how to use CompletionService in the book Java Concurrency in Practice.

Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
seh
  • 14,999
  • 2
  • 48
  • 58
  • This is a good counterpoint to my answer -- I'd say the straightforward answer to the question is invokeAll(); but @seh has it right when submitting groups of jobs to the ES and waiting for them to complete... --JA – andersoj Jul 23 '10 at 04:12
  • @om-nom-nom, thank you for updating the links. I am glad to see that the answer is still useful. – seh Sep 02 '13 at 16:56
  • 1
    Good answer, I wasn't aware of `CompletionService` – Vic Apr 21 '15 at 11:03
  • 1
    This is the approach to use, if you don't want to shutdown an existing ExecutorService, but just want to submit a batch of tasks, and know when they are all finished. – ToolmakerSteve Sep 06 '15 at 09:55
13

If you want to wait for the executor service to finish executing, call shutdown() and then, awaitTermination(units, unitType), e.g. awaitTermination(1, MINUTE). The ExecutorService does not block on it's own monitor, so you can't use wait etc.

Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
mdma
  • 56,943
  • 12
  • 94
  • 128
  • I think it's awaitTermination. – NG. Jul 17 '10 at 02:57
  • @SB - Thanks - I see my memory is fallible! I've updated the name and added a link to be sure. – mdma Jul 17 '10 at 11:24
  • To wait "forever," use it like `awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);` http://stackoverflow.com/a/1250655/32453 – rogerdpack Sep 16 '14 at 19:02
  • I think this is the easiest approach – Shervin Asgari Feb 05 '16 at 12:00
  • 1
    @MosheElisha, are you sure?. https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html#shutdown-- says _Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted._ – Jaime Hablutzel Nov 26 '18 at 20:06
  • @JaimeHablutzel You are correct. I just tested it also and I believe I was wrong. I will delete my comment so it won't confuse people. – MosheElisha Nov 28 '18 at 14:32
9

You could wait jobs to finish on a certain interval:

int maxSecondsPerComputeDTask = 20;
try {
    while (!es.awaitTermination(uniquePhrases.size() * maxSecondsPerComputeDTask, TimeUnit.SECONDS)) {
        // consider giving up with a 'break' statement under certain conditions
    }
} catch (InterruptedException e) {
    throw new RuntimeException(e);    
}

Or you could use ExecutorService.submit(Runnable) and collect the Future objects that it returns and call get() on each in turn to wait for them to finish.

ExecutorService es = Executors.newFixedThreadPool(2);
Collection<Future<?>> futures = new LinkedList<<Future<?>>();
for (DataTable singleTable : uniquePhrases) {
    futures.add(es.submit(new ComputeDTask(singleTable)));
}
for (Future<?> future : futures) {
   try {
       future.get();
   } catch (InterruptedException e) {
       throw new RuntimeException(e);
   } catch (ExecutionException e) {
       throw new RuntimeException(e);
   }
}

InterruptedException is extremely important to handle properly. It is what lets you or the users of your library terminate a long process safely.

Alain O'Dea
  • 21,033
  • 1
  • 58
  • 84
8

Just use

latch = new CountDownLatch(noThreads)

In each thread

latch.countDown();

and as barrier

latch.await();
J. Ruhe
  • 81
  • 1
  • 1
8

There are several approaches.

You can call first ExecutorService.shutdown and then ExecutorService.awaitTermination which returns:

true if this executor terminated and false if the timeout elapsed before termination

So:

There is a function called awaitTermination But a timeout has to be provided in it. Which is not a guarantee that when this returns all the tasks would have been finished. Is there a way to achieve this?

You just have to call awaitTermination in a loop.

Using awaitTermination:

A full example of this implementation:

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException {
        final int total_threads = 4;
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100));
        }

        int count = 0;

        // This is the relevant part
        // Chose the delay most appropriate for your use case
        executor.shutdown();
        while (!executor.awaitTermination(100, TimeUnit.MILLISECONDS)) {
            System.out.println("Waiting "+ count);
            count++;
        }
    }

    private static Runnable parallelWork(long sleepMillis) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
        };
    }
}

Using CountDownLatch:

Another option is to create a CountDownLatch with a count equal to the number of parallel tasks. Each thread calls countDownLatch.countDown();, while the main thread calls countDownLatch.await();.

A full example of this implementation:

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException {
        final int total_threads = 4;
        CountDownLatch countDownLatch = new CountDownLatch(total_threads);
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100, countDownLatch));
        }
        countDownLatch.await();
        System.out.println("Exit");
        executor.shutdown();
    }

    private static Runnable parallelWork(long sleepMillis, CountDownLatch countDownLatch) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
            countDownLatch.countDown();
        };
    }
}

Using CyclicBarrier:

Another approach is to use a Cyclic Barrier

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        final int total_threads = 4;
        CyclicBarrier barrier = new CyclicBarrier(total_threads+ 1);
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100, barrier));
        }
        barrier.await();
        System.out.println("Exit");
        executor.shutdown();
    }

    private static Runnable parallelWork(long sleepMillis, CyclicBarrier barrier) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
            try {
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
              // Do something
            }
        };
    }
}

There are other approaches as well but those would require changes to your initial requirements, namely:

How to wait for all tasks to be completed when they are submitted using ExecutorService.execute() .

Arvind Kumar Avinash
  • 71,965
  • 6
  • 74
  • 110
dreamcrash
  • 47,137
  • 25
  • 94
  • 117
7

Root cause for IllegalMonitorStateException:

Thrown to indicate that a thread has attempted to wait on an object's monitor or to notify other threads waiting on an object's monitor without owning the specified monitor.

From your code, you have just called wait() on ExecutorService without owning the lock.

Below code will fix IllegalMonitorStateException

try 
{
    synchronized(es){
        es.wait(); // Add some condition before you call wait()
    }
} 

Follow one of below approaches to wait for completion of all tasks, which have been submitted to ExecutorService.

  1. Iterate through all Future tasks from submit on ExecutorService and check the status with blocking call get() on Future object

  2. Using invokeAll on ExecutorService

  3. Using CountDownLatch

  4. Using ForkJoinPool or newWorkStealingPool of Executors(since java 8)

  5. Shutdown the pool as recommended in oracle documentation page

    void shutdownAndAwaitTermination(ExecutorService pool) {
       pool.shutdown(); // Disable new tasks from being submitted
       try {
       // Wait a while for existing tasks to terminate
       if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
           pool.shutdownNow(); // Cancel currently executing tasks
           // Wait a while for tasks to respond to being cancelled
           if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
       }
    } catch (InterruptedException ie) {
         // (Re-)Cancel if current thread also interrupted
         pool.shutdownNow();
         // Preserve interrupt status
         Thread.currentThread().interrupt();
    }
    

    If you want to gracefully wait for all tasks for completion when you are using option 5 instead of options 1 to 4, change

    if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
    

    to

    a while(condition) which checks for every 1 minute.

Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
7

You can use ExecutorService.invokeAll method, It will execute all task and wait till all threads finished their task.

Here is complete javadoc

You can also user overloaded version of this method to specify the timeout.

Here is sample code with ExecutorService.invokeAll

public class Test {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService service = Executors.newFixedThreadPool(3);
        List<Callable<String>> taskList = new ArrayList<>();
        taskList.add(new Task1());
        taskList.add(new Task2());
        List<Future<String>> results = service.invokeAll(taskList);
        for (Future<String> f : results) {
            System.out.println(f.get());
        }
    }

}

class Task1 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(2000);
            return "Task 1 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task1";
        }
    }
}

class Task2 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(3000);
            return "Task 2 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task2";
        }
    }
}
Nitin
  • 2,701
  • 2
  • 30
  • 60
3

I also have the situation that I have a set of documents to be crawled. I start with an initial "seed" document which should be processed, that document contains links to other documents which should also be processed, and so on.

In my main program, I just want to write something like the following, where Crawler controls a bunch of threads.

Crawler c = new Crawler();
c.schedule(seedDocument); 
c.waitUntilCompletion()

The same situation would happen if I wanted to navigate a tree; i would pop in the root node, the processor for each node would add children to the queue as necessary, and a bunch of threads would process all the nodes in the tree, until there were no more.

I couldn't find anything in the JVM which I thought was a bit surprising. So I wrote a class ThreadPool which one can either use directly or subclass to add methods suitable for the domain, e.g. schedule(Document). Hope it helps!

ThreadPool Javadoc | Maven

Adrian Smith
  • 17,236
  • 11
  • 71
  • 93
2

Add all threads in collection and submit it using invokeAll. If you can use invokeAll method of ExecutorService, JVM won’t proceed to next line until all threads are complete.

Here there is a good example: invokeAll via ExecutorService

zgormez
  • 429
  • 5
  • 9
1

Submit your tasks into the Runner and then wait calling the method waitTillDone() like this:

Runner runner = Runner.runner(2);

for (DataTable singleTable : uniquePhrases) {

    runner.run(new ComputeDTask(singleTable));
}

// blocks until all tasks are finished (or failed)
runner.waitTillDone();

runner.shutdown();

To use it add this gradle/maven dependency: 'com.github.matejtymes:javafixes:1.0'

For more details look here: https://github.com/MatejTymes/JavaFixes or here: http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html

Matej Tymes
  • 1,694
  • 1
  • 16
  • 30
0

I will just wait for the executor to terminate with a specified timeout that you think it is suitable for the tasks to complete.

 try {  
         //do stuff here 
         exe.execute(thread);
    } finally {
        exe.shutdown();
    }
    boolean result = exe.awaitTermination(4, TimeUnit.HOURS);
    if (!result)

    {
        LOGGER.error("It took more than 4 hour for the executor to stop, this shouldn't be the normal behaviour.");
    }
punkers
  • 107
  • 2
0

Sounds like you need ForkJoinPool and use the global pool to execute tasks.

public static void main(String[] args) {
    // the default `commonPool` should be sufficient for many cases.
    ForkJoinPool pool = ForkJoinPool.commonPool(); 
    // The root of your task that may spawn other tasks. 
    // Make sure it submits the additional tasks to the same executor that it is in.
    Runnable rootTask = new YourTask(pool); 
    pool.execute(rootTask);
    pool.awaitQuiescence(...);
    // that's it.
}

The beauty is in pool.awaitQuiescence where the method will block utilize the caller's thread to execute its tasks and then return when it is really empty.

Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
adib
  • 8,285
  • 6
  • 52
  • 91
0

how about this?

Object lock = new Object();
CountDownLatch cdl = new CountDownLatch(threadNum);
for (int i = 0; i < threadNum; i++) {
    executorService.execute(() -> {

        synchronized (lock) {
            cdl.countDown();
            try {
                lock.wait();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    });
}
cdl.await();
synchronized (lock) {
    lock.notifyAll();
}

if you do not add new tasks to ExecutorService , this may wait for all current tasks completed

钱宇豪
  • 57
  • 3
-1

A simple alternative to this is to use threads along with join. Refer : Joining Threads

Community
  • 1
  • 1
Vicky Kapadia
  • 6,025
  • 2
  • 24
  • 30