71

I can't use shutdown() and awaitTermination() because it is possible new tasks will be added to the ThreadPoolExecutor while it is waiting.

So I'm looking for a way to wait until the ThreadPoolExecutor has emptied it's queue and finished all of it's tasks without stopping new tasks from being added before that point.

If it makes any difference, this is for Android.

Thanks

Update: Many weeks later after revisiting this, I discovered that a modified CountDownLatch worked better for me in this case. I'll keep the answer marked because it applies more to what I asked.

Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
cottonBallPaws
  • 21,220
  • 37
  • 123
  • 171
  • If you're OK with new tasks being added, what happens if it never finishes? – Kylar Oct 14 '10 at 01:25
  • I think littleFluffyKitty only wants to wait for the "old" tasks to finish. – Thilo Oct 14 '10 at 01:27
  • 2
    I'm not so concerned with the possibility that it would never finish because if that is the case, then something else is already terribly broken. If all else fails I could implement a time out of some sort but I'm ok with just assuming it will finish. I want it to be able to take new tasks while it's waiting, or to say it another way, I want new tasks to be able to be added after the wait is called. – cottonBallPaws Oct 14 '10 at 03:44
  • I offered another, potential answer. I still don't understand what you mean by waiting for all the tasks to empty the queue but not wanting to inhibit new tasks to enter the queue... at some point, you have to draw the line, and TPE can't draw the line for you. Calling `shutdown()` is how you create the "edge" after which new tasks can't be submitted; calling `awaitTermination()` creates an edge ensuring you block until all previously-submitted tasks are complete. – andersoj Oct 14 '10 at 12:53
  • Do you simply want to wait for an asynchronous notification that the queue happens to be empty and shut it down right at that time? TPE (and more generally, thread semantics) probably won't let you do that, and if you could, it would be very hard to reason about from the task submitters' side. – andersoj Oct 14 '10 at 12:54
  • See also http://stackoverflow.com/questions/1250643/how-to-wait-for-all-threads-to-finish-using-executorservice – rogerdpack Nov 08 '12 at 22:48
  • I think this is a related question: http://stackoverflow.com/questions/3402895/java-threadpool-usage. You might want to check out the answers listed there... – sjlee Oct 14 '10 at 07:15

7 Answers7

79

If you are interested in knowing when a certain task completes, or a certain batch of tasks, you may use ExecutorService.submit(Runnable). Invoking this method returns a Future object which may be placed into a Collection which your main thread will then iterate over calling Future.get() for each one. This will cause your main thread to halt execution until the ExecutorService has processed all of the Runnable tasks.

Collection<Future<?>> futures = new LinkedList<Future<?>>();
futures.add(executorService.submit(myRunnable));
for (Future<?> future:futures) {
    future.get();
}
Tim Bender
  • 20,112
  • 2
  • 49
  • 58
  • 2
    +1 That seems to be the best way. Has to be done at the application level, however, where you submit the task, not at the executor service level. – Thilo Oct 14 '10 at 02:06
  • 12
    +1, or use invokeAll() on a batch of tasks, to await completion. See my answer here: http://stackoverflow.com/questions/3269445/executorservice-how-to-wait-for-all-tasks-to-finish/3269888 – andersoj Oct 14 '10 at 02:09
  • 1
    Right, if you are willing to make your tasks `Callable` instead of `Runnable`, the solution referenced by @andersoj is much simpler. – Tim Bender Oct 14 '10 at 02:12
  • 1
    Y, and note `Runnable`->`Callable` is trivial with the convenience method `Executors.callable()` – andersoj Oct 14 '10 at 02:16
  • Does the invokeAll() approach require knowing all of the tasks ahead of time? This is for an unknown number of tasks and with the possibility that tasks will be added to the queue even while it is waiting. Thanks for all of the great suggestions. – cottonBallPaws Oct 14 '10 at 03:47
  • Also, for the Future approach, will the for loop run into problems if something is added to both the executorService and the futures collection while the loop is looping? – cottonBallPaws Oct 14 '10 at 03:54
  • Yes, `invokeAll()` would require knowing and waiting to submit a block of tasks at a single point in time--otherwise I would have added it as a full-fledged answer to your question. @Thilo's `CompletionService` answer has a weaker, but similar, constraint. I believe @Thilo had a now-deleted answer which was correct for your stated question. – andersoj Oct 14 '10 at 12:39
  • @littleFluffyKitty, the `Future` approach in this answer is more or less equivalent to the `invokeAll()` solution. Since the `futures` collection is effectively immutable, it won't have trouble per se with new tasks added -- the snippet or `invokeAll()` will block until the tasks in question (here, whatever myRunnables have been added) complete. – andersoj Oct 14 '10 at 13:22
  • @andersoj and @littleFluffyKitty, the code I posted is roughly equivalent to invokeAll(), however, calls to `submit` could just as easily place the `Future` instances into a `BlockingQueue` and then the main thread could do a timed `poll` on the BlockingQueue, providing you the flexibility to wait until the executor has been empty for some time, providing all futures are placed in the same queue. – Tim Bender Oct 15 '10 at 19:03
  • @Tim Bender, agreed, your approach could be expanded to be more flexible... with attendant complexity (due to the partial synchrony exposed) levied on the caller. What does it mean for the queue to be momentarily empty? – andersoj Oct 15 '10 at 20:05
  • @Tim Bender, in your original answer, is it possible there will be memory leaks because of the linkedlist holding onto the futures? How can I be sure those references are not held? – cottonBallPaws Oct 16 '10 at 18:09
  • 1
    @littleFluffyKitty, In the original answer, the memory for the LinkedList will exist in the heap space so long as the variable is in scope and continues to point at that data structure in the heap. If you define the variable locally as part of some method, it will exist for the duration of the method. I don't really know how you intend to use this snippet. Of course, I am thinking like a Java developer, and Android is NOT Java. – Tim Bender Oct 18 '10 at 23:21
  • this will cause a memory leak, generate lots of garbage, if there are huge amount of tasks. – ROROROOROROR Jul 09 '14 at 13:26
  • @ROROROOROROR, No. No. No. – Tim Bender Jul 09 '14 at 21:09
  • @TimBender but in my project(spring-mvc), I have a procedure, which has thousands of tasks, with this `future` object, a lot of object are passed to `Tenured Gen`. Well maybe this is only for my project. – ROROROOROROR Jul 10 '14 at 07:51
  • @ROROROOROROR, Correct. But it is not a memory leak because the tasks and responses are scoped and will be collectable. It is not garbage because presumably there is something meaningful about the task output. If the task output is not meaningful, then another mechanism like a `CountDownLatch` could be used (as the OP states). If you submit thousands of tasks all at once, most will probably be tenured anyway because it will be a long time before the `ExecutorService` `BlockingQueue` is freed of them. – Tim Bender Jul 10 '14 at 17:58
8

My Scenario is a web crawler to fetch some information from a web site then processing them. A ThreadPoolExecutor is used to speed up the process because many pages can be loaded in the time. So new tasks will be created in the existing task because the crawler will follow hyperlinks in each page. The problem is the same: the main thread do not know when all the tasks are completed and it can start to process the result. I use a simple way to determine this. It is not very elegant but works in my case:

while (executor.getTaskCount()!=executor.getCompletedTaskCount()){
    System.err.println("count="+executor.getTaskCount()+","+executor.getCompletedTaskCount());
    Thread.sleep(5000);
}
executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS);
googol4u
  • 81
  • 1
  • 1
6

Maybe you are looking for a CompletionService to manage batches of task, see also this answer.

Community
  • 1
  • 1
Thilo
  • 257,207
  • 101
  • 511
  • 656
3

(This is an attempt to reproduce Thilo's earlier, deleted answer with my own adjustments.)

I think you may need to clarify your question since there is an implicit infinite condition... at some point you have to decide to shut down your executor, and at that point it won't accept any more tasks. Your question seems to imply that you want to wait until you know that no further tasks will be submitted, which you can only know in your own application code.

The following answer will allow you to smoothly transition to a new TPE (for whatever reason), completing all the currently-submitted tasks, and not rejecting new tasks to the new TPE. It might answer your question. @Thilo's might also.

Assuming you have defined somewhere a visible TPE in use as such:

AtomicReference<ThreadPoolExecutor> publiclyAvailableTPE = ...;

You can then write the TPE swap routine as such. It could also be written using a synchronized method, but I think this is simpler:

void rotateTPE()
{
   ThreadPoolExecutor newTPE = createNewTPE();
   // atomic swap with publicly-visible TPE
   ThreadPoolExecutor oldTPE = publiclyAvailableTPE.getAndSet(newTPE);
   oldTPE.shutdown();

   // and if you want this method to block awaiting completion of old tasks in  
   // the previously visible TPE
   oldTPE.awaitTermination();
} 

Alternatively, if you really no kidding want to kill the thread pool, then your submitter side will need to cope with rejected tasks at some point, and you could use null for the new TPE:

void killTPE()
{
   ThreadPoolExecutor oldTPE = publiclyAvailableTPE.getAndSet(null);
   oldTPE.shutdown();

   // and if you want this method to block awaiting completion of old tasks in  
   // the previously visible TPE
   oldTPE.awaitTermination();
} 

Which could cause upstream problems, the caller would need to know what to do with a null.

You could also swap out with a dummy TPE that simply rejected every new execution, but that's equivalent to what happens if you call shutdown() on the TPE.

andersoj
  • 22,406
  • 7
  • 62
  • 73
1

If you don't want to use shutdown, follow below approaches:

  1. Iterate through all Future tasks from submit on ExecutorService and check the status with blocking call get() on Future object as suggested by Tim Bender

  2. Use one of

    1. Using invokeAll on ExecutorService
    2. Using CountDownLatch
    3. Using ForkJoinPool or newWorkStealingPool of Executors(since java 8)

invokeAll() on executor service also achieves the same purpose of CountDownLatch

Related SE question:

How to wait for a number of threads to complete?

Community
  • 1
  • 1
Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
0

You could call the waitTillDone() on Runner class:

Runner runner = Runner.runner(10);

runner.runIn(2, SECONDS, runnable);
runner.run(runnable); // each of this runnables could submit more tasks

runner.waitTillDone(); // blocks until all tasks are finished (or failed)

// and now reuse it

runner.runIn(500, MILLISECONDS, callable);

runner.waitTillDone();
runner.shutdown();

To use it add this gradle/maven dependency to your project: 'com.github.matejtymes:javafixes:1.0'

For more details look here: https://github.com/MatejTymes/JavaFixes or here: http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html

Matej Tymes
  • 1,694
  • 1
  • 16
  • 30
0

Try using queue size and active tasks count as shown below

 while (executor.getThreadPoolExecutor().getActiveCount() != 0 || !executor.getThreadPoolExecutor().getQueue().isEmpty()){
                     try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
            }
        }