34

I use an ExecutorService to execute a task. This task can recursively create other tasks which are submitted to the same ExecutorService and those child tasks can do that, too.

I now have the problem that I want to wait until all the tasks are done (that is, all tasks are finished and they did not submit new ones) before I continue.

I cannot call ExecutorService.shutdown() in the main thread because this prevents new tasks from being accepted by the ExecutorService.

And Calling ExecutorService.awaitTermination() seems to do nothing if shutdown hasn't been called.

So I am kinda stuck here. It can't be that hard for the ExecutorService to see that all workers are idle, can it? The only inelegant solution I could come up with is to directly use a ThreadPoolExecutor and query its getPoolSize() every once in a while. Is there really no better way do do that?

Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
Christoph
  • 687
  • 1
  • 6
  • 12

11 Answers11

19

This really is an ideal candidate for a Phaser. Java 7 is coming out with this new class. Its a flexible CountdonwLatch/CyclicBarrier. You can get a stable version at JSR 166 Interest Site.

The way it is a more flexible CountdownLatch/CyclicBarrier is because it is able to not only support an unknown number of parties (threads) but its also reusable (thats where the phase part comes in)

For each task you submit you would register, when that task is completed you arrive. This can be done recursively.

Phaser phaser = new Phaser();
ExecutorService e = //

Runnable recursiveRunnable = new Runnable(){
   public void run(){
      //do work recursively if you have to

      if(shouldBeRecursive){
           phaser.register();
           e.submit(recursiveRunnable);
      }

      phaser.arrive();
   }
}

public void doWork(){
   int phase = phaser.getPhase();

   phaser.register();
   e.submit(recursiveRunnable);

   phaser.awaitAdvance(phase);
}

Edit: Thanks @depthofreality for pointing out the race condition in my previous example. I am updating it so that executing thread only awaits advance of the current phase as it blocks for the recursive function to complete.

The phase number won't trip until the number of arrives == registers. Since prior to each recursive call invokes register a phase increment will happen when all invocations are complete.

John Vint
  • 39,695
  • 7
  • 78
  • 108
  • Nice, Phasers seem to be what I need. I want to stick to the current standard Java library but as soon as it's out there I will use those. Thanks for the tip! – Christoph Feb 10 '11 at 15:23
  • I'm aware it was posted a long time ago. Still I'm wondering if there's race condition here. Can't doWork() complete before recursiveRunnable registers with phaser? – depthofreality Jan 14 '14 at 21:58
  • @depthofreality That's a great point. You are right there would certainly be a race here (must have overlooked it as I put this example together quickly). I'll update it now. – John Vint Jan 15 '14 at 12:34
  • @JohnVint thanks for clarification and fix. I think there's another issue now. Some parties may arrive without being registered. – depthofreality Jan 15 '14 at 21:56
  • @depthofreality I considered that, but that wouldn't be the case. The first party to register is done in `doWork`, since the phaser below that doesn't `arriveAndAwaitAdvance` the `recursiveRunnable` needs to arrive (which it does). Also notice the runnable `register`s before executing to the ExecutorService – John Vint Jan 16 '14 at 13:43
  • @depthofreality So prior to `awaitAdvance` the phaser is at phase 0 and registered parties 1. If `shouldBeRecursive` is false, then the runnable would arrive tripping the barrier. If it isnt false, the phaser would have 2 registered parties and 1 arrived party (until the runnable also arrives then tripping the barrier). – John Vint Jan 16 '14 at 13:45
  • @JohnVint If `shouldBeRecursive` is false then the party (thread) arrives not registered, right? The Oracle documentation says: __It is a usage error for an unregistered party to invoke this method (arrive, @depthofreality). However, this error may result in an IllegalStateException only upon some subsequent operation on this phaser, if ever.__ – depthofreality Jan 17 '14 at 12:17
  • @depthofreality If there was no `register` then yes, it would be a huge error. To answer your question, upon entering the runnable the first time, registered parties is 1, that is done in `doWork`. So base case of `shouldBeRecursive` == false the first entry, since phaser.register is done in `doWork` the register count is 1 and the arrive in this case would trip the barrier. – John Vint Jan 17 '14 at 12:49
  • I took the registering out of the recursive thread and placed it on the submitting thread. The arriving is done on the recusrvie thread. In the end you will always get a state where registered parties == arrived parties. – John Vint Jan 17 '14 at 12:50
  • @JohnVint, `e.submit()` and `run()` will potentially be executed by different threads. That's the reason I see the issue here. – depthofreality Jan 18 '14 at 20:43
  • @depthofreality It will be executed by other threads, but `phaser.register()` will be executed before the `e.submit()` and `run()` so that the phaser will always be in an incremented state before those two are invoked. Can you explain the race condition some more as you see it? – John Vint Jan 21 '14 at 01:48
  • @JohnVint Your code is OK. Sorry, it was my confusion. Phaser doesn't treat parties the same way as it treats threads. It is possible to register party on one thread and call arrive() on the other. After your fix of race condition I don't see any other issue here. Thanks! – depthofreality Jan 21 '14 at 07:55
17

If number of tasks in the tree of recursive tasks is initially unknown, perhaps the easiest way would be to implement your own synchronization primitive, some kind of "inverse semaphore", and share it among your tasks. Before submitting each task you increment a value, when task is completed, it decrements that value, and you wait until the value is 0.

Implementing it as a separate primitive explicitly called from tasks decouples this logic from the thread pool implementation and allows you to submit several independent trees of recursive tasks into the same pool.

Something like this:

public class InverseSemaphore {
    private int value = 0;
    private Object lock = new Object();

    public void beforeSubmit() {
        synchronized(lock) {
            value++;
        }
    }

    public void taskCompleted() {
        synchronized(lock) {
            value--;
            if (value == 0) lock.notifyAll();
        }
    }

    public void awaitCompletion() throws InterruptedException {
        synchronized(lock) {
            while (value > 0) lock.wait();
        }
    }
}

Note that taskCompleted() should be called inside a finally block, to make it immune to possible exceptions.

Also note that beforeSubmit() should be called by the submitting thread before the task is submitted, not by the task itself, to avoid possible "false completion" when old tasks are completed and new ones not started yet.

EDIT: Important problem with usage pattern fixed.

axtavt
  • 239,438
  • 41
  • 511
  • 482
  • 1
    Was answering something similar - he could get by using an AtomicInteger. – NG. Feb 10 '11 at 14:38
  • @SB: With AtomicInteger you can't wait for completion without busy waiting. – axtavt Feb 10 '11 at 14:44
  • There is a typo, you are doing lock-- instead of value-- – Suraj Chandran Feb 10 '11 at 14:49
  • @axtavt You can still use AtomicInteger instead of int in your InverseSemaphore. You wouldn't need to synchronise around it then. – dogbane Feb 10 '11 at 14:51
  • 1
    @dogbane that doesnt help this answer though becauase there would need to be synchronization for the waiting. – John Vint Feb 10 '11 at 14:52
  • Thanks for the answer! I posted an alternative to your basic AtomicInteger idea. Essentially I hook into the ThreadPoolExecutor to increment/decrement the counter and then use the norma ExecutorService.shutdown() and awaitTermination() mechanism. – Christoph Feb 10 '11 at 15:30
  • +1 for simple usage of basic object synchronization. If more people understood this fundamental concept, the world would be a much better place. – Travis May 03 '11 at 15:07
7

Wow, you guys are quick:)

Thank you for all the suggestions. Futures don't easily integrate with my model because I don't know how many runnables are scheduled beforehand. So if I keep a parent task alive just to wait for it's recursive child tasks to finish I have a lot of garbage laying around.

I solved my problem using the AtomicInteger suggestion. Essentially, I subclassed ThreadPoolExecutor and increment the counter on calls to execute() and decrement on calls to afterExecute(). When the counter gets 0 I call shutdown(). This seems to work for my problems, not sure if that's a generally good way to do that. Especially, I assume that you only use execute() to add Runnables.

As a side node: I first tried to check in afterExecute() the number of Runnables in the queue and the number of workers that are active and shutdown when those are 0; but that didn't work because not all Runnables showed up in the queue and the getActiveCount() didn't do what I expected either.

Anyhow, here's my solution: (if anybody finds serious problems with this, please let me know:)

public class MyThreadPoolExecutor extends ThreadPoolExecutor {

    private final AtomicInteger executing = new AtomicInteger(0);

    public MyThreadPoolExecutor(int coorPoolSize, int maxPoolSize, long keepAliveTime,
        TimeUnit seconds, BlockingQueue<Runnable> queue) {
        super(coorPoolSize, maxPoolSize, keepAliveTime, seconds, queue);
    }


    @Override
    public void execute(Runnable command) {
        //intercepting beforeExecute is too late!
        //execute() is called in the parent thread before it terminates
        executing.incrementAndGet();
        super.execute(command);
    }


    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        int count = executing.decrementAndGet();
        if(count == 0) {
            this.shutdown();
        }
    }

}
Christoph
  • 687
  • 1
  • 6
  • 12
  • While this would work for your specific requirements, it is not a general solution (given the potential race condition after you decrement and then test the value of count == 0.) The general solution would be to use AbstractQueuedSynchronizer to roll your own 'dynamic' countdown latch. – alphazero May 01 '11 at 05:52
  • You have the problem that the executor doesn't know when you have stop adding tasks. If at any point all your tasks finish before you have finished adding them, those tasks will be rejected as the pool has shutdown. – Peter Lawrey May 01 '11 at 10:36
  • @PeterLawrey Right, but there's a trivial solution: Increment the counter initially and decrement it when done with adding. Or use an "adder task" for adding all the tasks. – maaartinus Sep 20 '14 at 07:28
3

You could create your own thread pool which extends ThreadPoolExecutor. You want to know when a task has been submitted and when it completes.

public class MyThreadPoolExecutor extends ThreadPoolExecutor {
    private int counter = 0;

    public MyThreadPoolExecutor() {
        super(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    }

    @Override
    public synchronized void execute(Runnable command) {
        counter++;
        super.execute(command);
    }

    @Override
    protected synchronized void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        counter--;
        notifyAll();
    }

    public synchronized void waitForExecuted() throws InterruptedException {
        while (counter == 0)
            wait();
    }
}
Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
Peter Lawrey
  • 525,659
  • 79
  • 751
  • 1,130
  • 1
    I like this solution better than the one that has a score of 13. HOWEVER: the "while (counter == 0)" should be "while (counter > 0)", right??! – Tim Cooper Aug 06 '12 at 06:15
1

Use a Future for your tasks (instead of submitting Runnable's), a callback updates it's state when it's completed, so you can use Future.isDone to track the sate of all your tasks.

Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
Joel
  • 29,538
  • 35
  • 110
  • 138
  • How do you get a callback on a Future? Thought you had to call .get on it. – NG. Feb 10 '11 at 14:36
  • 1
    When he says callback he means the value you return from the call method – John Vint Feb 10 '11 at 14:38
  • You do, what I mean is that the 'done' flag is set for you (via a callback). I've rephrased the answer to make this more explicit. – Joel Feb 10 '11 at 14:38
  • ok so he still has to poll isDone? Just want to make sure. @John V. - the call method on a callable returns the result - waiting for it if needed. You get a Future back when you submit a Callable. Maybe our definition of callback differs. – NG. Feb 10 '11 at 14:46
  • No you're right SB I miss read your questioning on what he meant. – John Vint Feb 10 '11 at 14:51
  • This would require me to have a "central place" where I collect all the futures and wait for all of them to complete. This would work but doesn't fit the architecture of my program very well. – Christoph Feb 10 '11 at 15:27
  • Yes, you would need a central collection of Futures so that you could poll their isDone status. – Joel Feb 15 '11 at 14:07
0

I must say, that solutions described above of problem with recursive calling task and wait for end suborder tasks doesn't satisfy me. There is my solution inspired by original documentation from Oracle there: CountDownLatch and example there: Human resources CountDownLatch.

The first common thread in process in instance of class HRManagerCompact has waiting latch for two daughter's threads, wich has waiting latches for their subsequent 2 daughter's threads... etc.

Of course, latch can be set on the different value than 2 (in constructor of CountDownLatch), as well as the number of runnable objects can be established in iteration i.e. ArrayList, but it must correspond (number of count downs must be equal the parameter in CountDownLatch constructor).

Be careful, the number of latches increases exponentially according restriction condition: 'level.get() < 2', as well as the number of objects. 1, 2, 4, 8, 16... and latches 0, 1, 2, 4... As you can see, for four levels (level.get() < 4) there will be 15 waiting threads and 7 latches in the time, when peak 16 threads are running.

package processes.countdownlatch.hr;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/** Recursively latching running classes to wait for the peak threads
 *
 * @author hariprasad
 */
public class HRManagerCompact extends Thread {
  final int N = 2; // number of daughter's tasks for latch
  CountDownLatch countDownLatch;
  CountDownLatch originCountDownLatch;
  AtomicInteger level = new AtomicInteger(0);
  AtomicLong order = new AtomicLong(0); // id latched thread waiting for

  HRManagerCompact techLead1 = null;
  HRManagerCompact techLead2 = null;
  HRManagerCompact techLead3 = null;

// constructor
public HRManagerCompact(CountDownLatch countDownLatch, String name,
    AtomicInteger level, AtomicLong order){
  super(name);
  this.originCountDownLatch=countDownLatch;
  this.level = level;
  this.order = order;
 }

 private void doIt() {
    countDownLatch = new CountDownLatch(N);
    AtomicInteger leveli = new AtomicInteger(level.get() + 1);
    AtomicLong orderi = new AtomicLong(Thread.currentThread().getId());
    techLead1 = new HRManagerCompact(countDownLatch, "first", leveli, orderi);
    techLead2 = new HRManagerCompact(countDownLatch, "second", leveli, orderi);
    //techLead3 = new HRManagerCompact(countDownLatch, "third", leveli);

    techLead1.start();
    techLead2.start();
    //techLead3.start();

    try {
     synchronized (Thread.currentThread()) { // to prevent print and latch in the same thread
       System.out.println("*** HR Manager waiting for recruitment to complete... " + level + ", " + order + ", " + orderi);
       countDownLatch.await(); // wait actual thread
     }
     System.out.println("*** Distribute Offer Letter, it means finished. " + level + ", " + order + ", " + orderi);
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
  }

 @Override
 public void run() {
  try {
   System.out.println(Thread.currentThread().getName() + ": working... " + level + ", " + order + ", " + Thread.currentThread().getId());
   Thread.sleep(10*level.intValue());
   if (level.get() < 2) doIt();
   Thread.yield();
  }
  catch (Exception e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  /*catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }*/
  // TODO Auto-generated method stub
  System.out.println("--- " +Thread.currentThread().getName() + ": recruted " + level + ", " + order + ", " + Thread.currentThread().getId());
  originCountDownLatch.countDown(); // count down
 }

 public static void main(String args[]){
  AtomicInteger levelzero = new AtomicInteger(0);
  HRManagerCompact hr = new HRManagerCompact(null, "zero", levelzero, new AtomicLong(levelzero.longValue()));
  hr.doIt();
 }
}

Possible commented output (with some probability):

first: working... 1, 1, 10 // thread 1, first daughter's task (10)
second: working... 1, 1, 11 // thread 1, second daughter's task (11)
first: working... 2, 10, 12 // thread 10, first daughter's task (12)
first: working... 2, 11, 14 // thread 11, first daughter's task (14)
second: working... 2, 11, 15 // thread 11, second daughter's task (15)
second: working... 2, 10, 13 // thread 10, second daughter's task (13)
--- first: recruted 2, 10, 12 // finished 12
--- first: recruted 2, 11, 14 // finished 14
--- second: recruted 2, 10, 13  // finished 13 (now can be opened latch 10)
--- second: recruted 2, 11, 15  // finished 15 (now can be opened latch 11)
*** HR Manager waiting for recruitment to complete... 0, 0, 1
*** HR Manager waiting for recruitment to complete... 1, 1, 10
*** Distribute Offer Letter, it means finished. 1, 1, 10 // latch on 10 opened
--- first: recruted 1, 1, 10 // finished 10
*** HR Manager waiting for recruitment to complete... 1, 1, 11
*** Distribute Offer Letter, it means finished. 1, 1, 11 // latch on 11 opened
--- second: recruted 1, 1, 11  // finished 11 (now can be opened latch 1)
*** Distribute Offer Letter, it means finished. 0, 0, 1  // latch on 1 opened
hariprasad
  • 555
  • 11
  • 20
0

The only inelegant solution I could come up with is to directly use a ThreadPoolExecutor and query its getPoolSize() every once in a while. Is there really no better way do do that?

You have to use shutdown() ,awaitTermination()and shutdownNow() methods in a proper sequence.

shutdown(): Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.

awaitTermination():Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.

shutdownNow(): Attempts to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of the tasks that were awaiting execution.

Recommended way from oracle documentation page of ExecutorService:

 void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
     }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }

You can replace if condition with while condition in case of long duration in completion of tasks as below:

Change

if (!pool.awaitTermination(60, TimeUnit.SECONDS))

To

 while(!pool.awaitTermination(60, TimeUnit.SECONDS)) {
     Thread.sleep(60000);
 }  

You can refer to other alternatives (except join(), which can be used with standalone thread ) in :

wait until all threads finish their work in java

Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
0

Use CountDownLatch. Pass the CountDownLatch object to each of your tasks and code your tasks something like below.

public void doTask() {
    // do your task
    latch.countDown(); 
}

Whereas the thread which needs to wait should execute the following code:

public void doWait() {
    latch.await();
}

But ofcourse, this assumes you already know the number of child tasks so that you could initialize the latch's count.

Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
Suraj Chandran
  • 24,433
  • 12
  • 63
  • 94
0

You could use a runner that keeps track of running threads:

Runner runner = Runner.runner(numberOfThreads);

runner.runIn(2, SECONDS, callable);
runner.run(callable);


// blocks until all tasks are finished (or failed)
runner.waitTillDone();


// and reuse it
runner.runRunnableIn(500, MILLISECONDS, runnable);


runner.waitTillDone();


// and then just kill it
runner.shutdownAndAwaitTermination();

to use it you just add a dependency:

compile 'com.github.matejtymes:javafixes:1.3.0'

Matej Tymes
  • 1,694
  • 1
  • 16
  • 30
0

(mea culpa: its a 'bit' past my bedtime ;) but here's a first attempt at a dynamic latch):

package oss.alphazero.sto4958330;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class DynamicCountDownLatch {
    @SuppressWarnings("serial")
    private static final class Sync extends AbstractQueuedSynchronizer {
        private final CountDownLatch toplatch;
        public Sync() {
            setState(0);
            this.toplatch = new CountDownLatch(1);
        }

        @Override
        protected int tryAcquireShared(int acquires){
            try {
                toplatch.await();
            } 
            catch (InterruptedException e) {
                throw new RuntimeException("Interrupted", e);
            }
            return getState() == 0 ? 1 : -1;
        }
        public boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc)) 
                    return nextc == 0;
            }
        }
        public boolean tryExtendState(int acquires) {
            for (;;) {
                int s = getState();
                int exts = s+1;
                if (compareAndSetState(s, exts)) {
                    toplatch.countDown();
                    return exts > 0;
                }
            }
        }
    }
    private final Sync sync;
    public DynamicCountDownLatch(){
        this.sync = new Sync();
    }
    public void await() 
        throws InterruptedException   
    {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long timeout, TimeUnit   unit) 
        throws InterruptedException   
    {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    public void countDown() {
        sync.releaseShared(1);
    }
    public void join() {
        sync.tryExtendState(1);
    }
}

This latch introduces a new method join() to the existing (cloned) CountDownLatch API, which is used by tasks to signal their entry into the larger task group.

The latch is pass around from parent Task to child Task. Each task would, per Suraj's pattern, first 'join()' the latch, do its task(), and then countDown().

To address situations where the main thread launches the task group and then immediately awaits() -- before any of the task threads have had a chance to even join() -- the topLatch is used int inner Sync class. This is a latch that will get counted down on each join(); only the first countdown is of course significant, as all subsequent ones are nops.

The initial implementation above does introduce a semantic wrinkle of sorts since the tryAcquiredShared(int) is not supposed to be throwing an InterruptedException but then we do need to deal with the interrupt on the wait on the topLatch.

Is this an improvement over OP's own solution using Atomic counters? I would say probably not IFF he is insistent upon using Executors, but it is, I believe, an equally valid alternative approach using the AQS in that case, and, is usable with generic threads as well.

Crit away fellow hackers.

alphazero
  • 27,094
  • 3
  • 30
  • 26
0

If you want to use JSR166y classes - e.g. Phaser or Fork/Join - either of which might work for you, you can always download the Java 6 backport of them from: http://gee.cs.oswego.edu/dl/concurrency-interest/ and use that as a basis rather than writing a completely homebrew solution. Then when 7 comes out you can just drop the dependency on the backport and change a few package names.

(Full disclosure: We've been using the LinkedTransferQueue in prod for a while now. No issues)

kittylyst
  • 5,640
  • 2
  • 23
  • 36