35

My Question: How to execute a bunch of threaded objects on a ThreadPoolExecutor and wait for them all to finish before moving on?

I'm new to ThreadPoolExecutor. So this code is a test to learn how it works. Right now I don't even fill the BlockingQueue with the objects because I don't understand how to start the queue without calling execute() with another RunnableObject. Anyway, right now I just call awaitTermination() but I think I'm still missing something. Any tips would be great! Thanks.

public void testThreadPoolExecutor() throws InterruptedException {
  int limit = 20;
  BlockingQueue q = new ArrayBlockingQueue(limit);
  ThreadPoolExecutor ex = new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q);
  for (int i = 0; i < limit; i++) {
    ex.execute(new RunnableObject(i + 1));
  }
  ex.awaitTermination(2, TimeUnit.SECONDS);
  System.out.println("finished");
}

The RunnableObject class:

package playground;

public class RunnableObject implements Runnable {

  private final int id;

  public RunnableObject(int id) {
    this.id = id;
  }

  @Override
  public void run() {
    System.out.println("ID: " + id + " started");
    try {
      Thread.sleep(2354);
    } catch (InterruptedException ignore) {
    }
    System.out.println("ID: " + id + " ended");
  }
}
kentcdodds
  • 27,113
  • 32
  • 108
  • 187
  • 1
    I don't think it's a good idea to answer your question immediately after you post it and suggest that your own answer is not quite fitting. At best, that warrants and update to your original question explaining why that answer is not sufficient. – Kiril Jun 07 '12 at 14:59
  • 1
    @Link, you're right. I'll fix it. – kentcdodds Jun 07 '12 at 15:01
  • I just edited my answer to show one way to wait for all of the jobs to finish before shutting down the executor. – Gray Jun 07 '12 at 15:35
  • possible duplicate of [How to wait for all threads to finish, using ExecutorService?](http://stackoverflow.com/questions/1250643/how-to-wait-for-all-threads-to-finish-using-executorservice) – rogerdpack Apr 08 '15 at 19:29

6 Answers6

54

You should loop on awaitTermination

ExecutorService threads;
// ...
// Tell threads to finish off.
threads.shutdown();
// Wait for everything to finish.
while (!threads.awaitTermination(10, TimeUnit.SECONDS)) {
  log.info("Awaiting completion of threads.");
}
OldCurmudgeon
  • 64,482
  • 16
  • 119
  • 213
  • 1
    That totally did it. I didn't notice that `awaitTermination` returns anything. Nobody else mentioned that I'd need to loop through it to block. Thanks! – kentcdodds Jun 07 '12 at 16:19
  • 11
    Why do it in a loop? Why not just increase the number of seconds to wait for it. Typically I do a `awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)` or something. – Gray Jun 07 '12 at 17:01
  • 1
    @Gray - I initially just called it once in a piece of code of mine and either really strange things started happening on exit sometimes or everything locked up. Eventually I tracked it down to this so now I loop and log a "waiting" message so I know something strange is going on. – OldCurmudgeon Jun 07 '12 at 19:07
  • 3
    I like the while loop because I have an executor with tasks processing files, and the amount of time they will take to complete is unknown (depends on the number of files) – Broken_Window Aug 23 '16 at 20:27
9

Your issue seems to be that you are not calling shutdown after you have submitted all of the jobs to your pool. Without shutdown() your awaitTermination will always return false.

ThreadPoolExecutor ex =
    new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q);
for (int i = 0; i < limit; i++) {
  ex.execute(new RunnableObject(i + 1));
}
// you are missing this line!!
ex.shutdown();
ex.awaitTermination(2, TimeUnit.SECONDS);

You can also do something like the following to wait for all your jobs to finish:

List<Future<Object>> futures = new ArrayList<Future<Object>>();
for (int i = 0; i < limit; i++) {
  futures.add(ex.submit(new RunnableObject(i + 1), (Object)null));
}
for (Future<Object> future : futures) {
   // this joins with the submitted job
   future.get();
}
...
// still need to shutdown at the end
ex.shutdown();

Also, because you are sleeping for 2354 milliseconds but only waiting for the termination of all of the jobs for 2 SECONDS, awaitTermination will always return false. I would use Long.MAX_VALUE to wait for the jobs to finish.

Lastly, it sounds like you are worrying about created a new ThreadPoolExecutor and you instead want to reuse the first one. Don't be. The GC overhead is going to be extremely minimal compared to any code that you write to detect if the jobs are finished.


To quote from the javadocs, ThreadPoolExecutor.shutdown():

Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted. Invocation has no additional effect if already shut down.

In the ThreadPoolExecutor.awaitTermination(...) method, it is waiting for the state of the executor to go to TERMINATED. But first the state must go to SHUTDOWN if shutdown() is called or STOP if shutdownNow() is called.

Gray
  • 115,027
  • 24
  • 293
  • 354
3

It's nothing to do with the executor itself. Just use the interface's java.util.concurrent.ExecutorService.invokeAll(Collection<? extends Callable<T>>). It will block until all the Callables are finished.

Executors are meant to be long-lived; beyond the lifetime of a group of tasks. shutdown is for when the application is finished and cleaning up.

artbristol
  • 32,010
  • 5
  • 70
  • 103
  • Could you elaborate on this answer. I'm liking the answer, but I'm having a hard time implementing it. Thanks – kentcdodds Jun 07 '12 at 15:20
2

Here's a variant on the accepted answer that handles retries if/when InterruptedException is thrown:

executor.shutdown();

boolean isWait = true;

while (isWait)
{
    try
    {             
        isWait = !executor.awaitTermination(10, TimeUnit.SECONDS);
        if (isWait)
        {
            log.info("Awaiting completion of bulk callback threads.");
        }
    } catch (InterruptedException e) {
        log.debug("Interruped while awaiting completion of callback threads - trying again...");
    }
}
Justin Killen
  • 728
  • 6
  • 19
1

Another approach is to use CompletionService, very useful if you have to attempt any task result:

//run 3 task at time
final int numParallelThreads = 3;

//I used newFixedThreadPool for convenience but if you need you can use ThreadPoolExecutor
ExecutorService executor = Executors.newFixedThreadPool(numParallelThreads);
CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);

int numTaskToStart = 15;

for(int i=0; i<numTaskToStart ; i++){
    //task class that implements Callable<String> (or something you need)
    MyTask mt = new MyTask();

    completionService.submit(mt);
}

executor.shutdown(); //it cannot be queued more task

try {
    for (int t = 0; t < numTaskToStart ; t++) {
        Future<String> f = completionService.take();
        String result = f.get();
        // ... something to do ...
    }
} catch (InterruptedException e) {
    //termination of all started tasks (it returns all not started tasks in queue)
    executor.shutdownNow();
} catch (ExecutionException e) {
    // ... something to catch ...
}
fl4l
  • 1,580
  • 4
  • 21
  • 27
-1

Try this,

ThreadPoolExecutor ex =
    new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q);
for (int i = 0; i < limit; i++) {
  ex.execute(new RunnableObject(i + 1));
}

Lines to be added

ex.shutdown();
ex.awaitTermination(timeout, unit)
Kumar Vivek Mitra
  • 33,294
  • 6
  • 48
  • 75