13

My question is strongly related to this one here. As was posted there, I would like the main thread to wait until the work queue is empty and all tasks have finished. The problem in my situation is, however, that each task may recursively cause new tasks to be submitted for processing. This makes it a little awkward to collect all of those tasks's futures.

Our current solution uses a busy-wait loop to await termination:

        do { //Wait until we are done the processing
      try {
        Thread.sleep(200);
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
    } while (!executor.getQueue().isEmpty()
             || numTasks.longValue() > executor.getCompletedTaskCount());

numTasks is a value that is increased as every new task is created. This works but I think it's not very nice due to the busy waiting. I was wondering whether there is a good way to make the main thread wait synchronously, until being explicitly woken up.

Community
  • 1
  • 1
Eric
  • 1,343
  • 1
  • 11
  • 19
  • 1
    If you have recursively submitted tasks then a `ForkJoinPool` can be very helpful *if you can use Java 7*. – thkala Jan 26 '13 at 09:56
  • Does the last task know that it is the last one? – assylias Jan 26 '13 at 09:58
  • I looked at ForkJoinPool, however, I am not sure it's appropriate here. The problem is that all the tasks are independent; they don't need to await each other's completion. The main thread, however, should await completion. – Eric Jan 26 '13 at 09:58
  • assylias - No, unfortunately it does not. – Eric Jan 26 '13 at 09:59
  • @Eric: *"they don't need"* is different than *"they should not"*. Do you *need* the "parent" tasks to return immediately? – thkala Jan 26 '13 at 10:06

9 Answers9

6

Thanks a lot for all your suggestions!

In the end I opted for something that I believe to be reasonably simple. I found out that CountDownLatch is almost what I need. It blocks until the counter reaches 0. The only problem is that it can only count down, not up, and thus does not work in the dynamic setting I have where tasks can submit new tasks. I hence implemented a new class CountLatch that offers additional functionality. (see below) This class I then use as follows.

Main thread calls latch.awaitZero(), blocking until latch reaches 0.

Any thread, before calling executor.execute(..) calls latch.increment().

Any task, just before completing, calls latch.decrement().

When the last task terminates, the counter will reach 0 and thus release the main thread.

Further suggestions and feedback are most welcome!

public class CountLatch {

@SuppressWarnings("serial")
private static final class Sync extends AbstractQueuedSynchronizer {

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected int acquireNonBlocking(int acquires) {
        // increment count
        for (;;) {
            int c = getState();
            int nextc = c + 1;
            if (compareAndSetState(c, nextc))
                return 1;
        }
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c - 1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

private final Sync sync;

public CountLatch(int count) {
    this.sync = new Sync(count);
}

public void awaitZero() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public boolean awaitZero(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void increment() {
    sync.acquireNonBlocking(1);
}

public void decrement() {
    sync.releaseShared(1);
}

public String toString() {
    return super.toString() + "[Count = " + sync.getCount() + "]";
}

}

Note that the increment()/decrement() calls can be encapsulated into a customized Executor subclass as was suggested, for instance, by Sami Korhonen, or with beforeExecute and afterExecute as was suggested by impl. See here:

public class CountingThreadPoolExecutor extends ThreadPoolExecutor {

protected final CountLatch numRunningTasks = new CountLatch(0);

public CountingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
        BlockingQueue<Runnable> workQueue) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

@Override
public void execute(Runnable command) {
    numRunningTasks.increment();
    super.execute(command);
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
    numRunningTasks.decrement();
    super.afterExecute(r, t);
}

/**
 * Awaits the completion of all spawned tasks.
 */
public void awaitCompletion() throws InterruptedException {
    numRunningTasks.awaitZero();
}

/**
 * Awaits the completion of all spawned tasks.
 */
public void awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException {
    numRunningTasks.awaitZero(timeout, unit);
}

}
Eric
  • 1,343
  • 1
  • 11
  • 19
  • Also, take a look on [`Semaphore`](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html). – Eng.Fouad Jan 27 '13 at 03:44
  • I did. The problem with `Semaphore` is that it always blocks when acquiring. The `CountLatch` does not block when either counting down or up. It just blocks on calls to `awaitZero`. I guess that's the crucial difference. – Eric Jan 27 '13 at 09:45
  • I just edited the solution. As mentioned before, it's important to increment the counter when the task is *submitted*, not when it starts to run. Hence, we need to overwrite `execute`, not `beforeExecute`. Thanks again for all your great suggestions. The solution is running smoothly now! – Eric Jan 28 '13 at 08:59
5

Java 7 provides a synchronizer that fits this use case called Phaser. It's a re-usable hybrid of a CountDownLatch and CyclicBarrier that can both increase and decrease the number of registered parties (similar to an incrementable CountDownLatch).

The basic pattern to using the phaser in this scenario is to register tasks with the phaser when created and arrive when completed. When the number of arrived parties matches the number of registered, the phaser "advances" to the next phase, notifying any waiting threads of the advance when it takes place.

Here's an example I've created of waiting for recursive task completion. It naively finds the first few numbers of the Fibonacci sequence for demonstration purposes:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicLong;

/**
 * An example of using a Phaser to wait for the completion of recursive tasks.
 * @author Voxelot
 */
public class PhaserExample {
    /** Workstealing threadpool with reduced queue contention. */
    private static ForkJoinPool executors;

    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) throws InterruptedException {
        executors = new ForkJoinPool();
        List<Long> sequence = new ArrayList<>();
        for (int i = 0; i < 20; i++) {
            sequence.add(fib(i));
        }
        System.out.println(sequence);
    }

    /**
     * Computes the nth Fibonacci number in the Fibonacci sequence.
     * @param n The index of the Fibonacci number to compute
     * @return The computed Fibonacci number
     */
    private static Long fib(int n) throws InterruptedException {
        AtomicLong result = new AtomicLong();
        //Flexible sychronization barrier
        Phaser phaser = new Phaser();
        //Base task
        Task initialTask = new Task(n, result, phaser);
        //Register fib(n) calling thread
        phaser.register();
        //Submit base task
        executors.submit(initialTask);
        //Make the calling thread arrive at the synchronization
        //barrier and wait for all future tasks to arrive.
        phaser.arriveAndAwaitAdvance();
        //Get the result of the parallel computation.
        return result.get();
    }

    private static class Task implements Runnable {
        /** The Fibonacci sequence index of this task. */
        private final int index;
        /** The shared result of the computation. */
        private final AtomicLong result;
        /** The synchronizer. */
        private final Phaser phaser;

        public Task(int n, AtomicLong result, Phaser phaser) {
            index = n;
            this.result = result;
            this.phaser = phaser;
            //Inform synchronizer of additional work to complete.
            phaser.register();
        }

        @Override
        public void run() {
            if (index == 1) {
                result.incrementAndGet();
            } else if (index > 1) {
                //recurrence relation: Fn = Fn-1 + Fn-2
                Task task1 = new Task(index - 1, result, phaser);
                Task task2 = new Task(index - 2, result, phaser);
                executors.submit(task1);
                executors.submit(task2);
            }
            //Notify synchronizer of task completion.
            phaser.arrive();
        }
    }
}
BrandonK.
  • 128
  • 3
  • 11
  • **Disclaimer:** While my example leverages a fork-join pool, you may want to give this a read before using one in real-world scenario: http://coopsoft.com/ar/CalamityArticle.html – BrandonK. Aug 25 '15 at 21:56
4

This one was actually rather interesting problem to solve. I must warn that I have not tested the code fully.

The idea is to simply track the task execution:

  • if task is successfully queued, counter is incremented by one
  • if task is cancelled and it has not been executed, counter is decremented by one
  • if task has been executed, counter is decremented by one

When shutdown is called and there are pending tasks, delegate will not call shutdown on the actual ExecutorService. It will allow queuing new tasks until pending task count reaches zero and shutdown is called on actual ExecutorService.

public class ResilientExecutorServiceDelegate implements ExecutorService {
    private final ExecutorService executorService;
    private final AtomicInteger pendingTasks;
    private final Lock readLock;
    private final Lock writeLock;
    private boolean isShutdown;

    public ResilientExecutorServiceDelegate(ExecutorService executorService) {
        ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        this.pendingTasks = new AtomicInteger();
        this.readLock = readWriteLock.readLock();
        this.writeLock = readWriteLock.writeLock();
        this.executorService = executorService;
        this.isShutdown = false;
    }

    private <T> T addTask(Callable<T> task) {
        T result;
        boolean success = false;
        // Increment pending tasks counter
        incrementPendingTaskCount();
        try {
            // Call service
            result = task.call();
            success = true;
        } catch (RuntimeException exception) {
            throw exception;
        } catch (Exception exception) {
            throw new RejectedExecutionException(exception);
        } finally {
            if (!success) {
                // Decrement pending tasks counter
                decrementPendingTaskCount();
            }
        }
        return result;
    }

    private void incrementPendingTaskCount() {
        pendingTasks.incrementAndGet();
    }

    private void decrementPendingTaskCount() {
        readLock.lock();
        if (pendingTasks.decrementAndGet() == 0 && isShutdown) {
            try {
                // Shutdown
                executorService.shutdown();
            } catch (Throwable throwable) {
            }
        }
        readLock.unlock();
    }

    @Override
    public void execute(final Runnable task) {
        // Add task
        addTask(new Callable<Object>() {
            @Override
            public Object call() {
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            task.run();
                        } finally {
                            decrementPendingTaskCount();
                        }
                    }
                });
                return null;
            }
        });
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
        // Call service
        return executorService.awaitTermination(timeout, unit);
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
        // It's ok to increment by just one
        incrementPendingTaskCount();
        try {
            return executorService.invokeAll(tasks);
        } finally {
            decrementPendingTaskCount();
        }
    }

    @Override
    public <T> List<Future<T>> invokeAll(
            Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
            throws InterruptedException {
        // It's ok to increment by just one
        incrementPendingTaskCount();
        try {
            return executorService.invokeAll(tasks, timeout, unit);
        } finally {
            decrementPendingTaskCount();
        }
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {
        // It's ok to increment by just one
        incrementPendingTaskCount();
        try {
            return executorService.invokeAny(tasks);
        } finally {
            decrementPendingTaskCount();
        }
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
            long timeout, TimeUnit unit) throws InterruptedException,
            ExecutionException, TimeoutException {
        incrementPendingTaskCount();
        try {
            return executorService.invokeAny(tasks, timeout, unit);
        } finally {
            decrementPendingTaskCount();
        }
    }

    @Override
    public boolean isShutdown() {
        return isShutdown;
    }

    @Override
    public boolean isTerminated() {
        return executorService.isTerminated();
    }

    @Override
    public void shutdown() {
        // Lock write lock
        writeLock.lock();
        // Set as shutdown
        isShutdown = true;
        try {
            if (pendingTasks.get() == 0) {
                // Real shutdown
                executorService.shutdown();
            }
        } finally {
            // Unlock write lock
            writeLock.unlock();
        }
    }

    @Override
    public List<Runnable> shutdownNow() {
        // Lock write lock
        writeLock.lock();
        // Set as shutdown
        isShutdown = true;
        // Unlock write lock
        writeLock.unlock();

        return executorService.shutdownNow();
    }

    @Override
    public <T> Future<T> submit(final Callable<T> task) {
        // Create execution status
        final FutureExecutionStatus futureExecutionStatus = new FutureExecutionStatus();
        // Add task
        return addTask(new Callable<Future<T>>() {
            @Override
            public Future<T> call() {
                return new FutureDelegate<T>(
                        executorService.submit(new Callable<T>() {
                            @Override
                            public T call() throws Exception {
                                try {
                                    // Mark as executed
                                    futureExecutionStatus.setExecuted();
                                    // Run the actual task
                                    return task.call();
                                } finally {
                                    decrementPendingTaskCount();
                                }
                            }
                        }), futureExecutionStatus);
            }
        });
    }

    @Override
    public Future<?> submit(final Runnable task) {
        // Create execution status
        final FutureExecutionStatus futureExecutionStatus = new FutureExecutionStatus();
        // Add task
        return addTask(new Callable<Future<?>>() {
            @Override
            @SuppressWarnings("unchecked")
            public Future<?> call() {
                return new FutureDelegate<Object>(
                        (Future<Object>) executorService.submit(new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    // Mark as executed
                                    futureExecutionStatus.setExecuted();
                                    // Run the actual task
                                    task.run();
                                } finally {
                                    decrementPendingTaskCount();
                                }
                            }
                        }), futureExecutionStatus);
            }
        });
    }

    @Override
    public <T> Future<T> submit(final Runnable task, final T result) {
        // Create execution status
        final FutureExecutionStatus futureExecutionStatus = new FutureExecutionStatus();
        // Add task
        return addTask(new Callable<Future<T>>() {
            @Override
            public Future<T> call() {
                return new FutureDelegate<T>(executorService.submit(
                        new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    // Mark as executed
                                    futureExecutionStatus.setExecuted();
                                    // Run the actual task
                                    task.run();
                                } finally {
                                    decrementPendingTaskCount();
                                }
                            }
                        }, result), futureExecutionStatus);
            }
        });
    }

    private class FutureExecutionStatus {
        private volatile boolean executed;

        public FutureExecutionStatus() {
            executed = false;
        }

        public void setExecuted() {
            executed = true;
        }

        public boolean isExecuted() {
            return executed;
        }
    }

    private class FutureDelegate<T> implements Future<T> {
        private Future<T> future;
        private FutureExecutionStatus executionStatus;

        public FutureDelegate(Future<T> future,
                FutureExecutionStatus executionStatus) {
            this.future = future;
            this.executionStatus = executionStatus;
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            boolean cancelled = future.cancel(mayInterruptIfRunning);
            if (cancelled) {
                // Lock read lock
                readLock.lock();
                // If task was not executed
                if (!executionStatus.isExecuted()) {
                    decrementPendingTaskCount();
                }
                // Unlock read lock
                readLock.unlock();
            }
            return cancelled;
        }

        @Override
        public T get() throws InterruptedException, ExecutionException {
            return future.get();
        }

        @Override
        public T get(long timeout, TimeUnit unit) throws InterruptedException,
                ExecutionException, TimeoutException {
            return future.get(timeout, unit);
        }

        @Override
        public boolean isCancelled() {
            return future.isCancelled();
        }

        @Override
        public boolean isDone() {
            return future.isDone();
        }
    }
}
Sami Korhonen
  • 1,254
  • 8
  • 17
  • Wow, thanks a lot for the effort! This looks really comprehensive. It's nice to see that this can be modularized so well. – Eric Jan 26 '13 at 18:39
1

Why don't you use a counter? For example:

private AtomicInteger counter = new AtomicInteger(0);

and increment the counter by one just before submitting the task to the queue:

counter.incrementAndGet();

and decrement it by one at the end of the task:

counter.decrementAndGet();

and the check would be something like:

// ...
while (counter.get() > 0);
Eng.Fouad
  • 115,165
  • 71
  • 313
  • 417
  • 1
    That makes a lot of sense. – assylias Jan 26 '13 at 10:15
  • Ah, right. Well, I am facing the same problem as the OP. My current (partial) solution involves atomic counters and `notifyAll()` being called for a locked object when the counter reaches zero, but I cannot really be certain that it works until I have mathematically verified that all possible sequences are handled correctly :-/ – thkala Jan 26 '13 at 10:21
  • @Eric: no, calling `wait()` suspends the thread. Whether that is more efficient than a busy loop or not depends on the application - `notifyAll()` does have its cost... – thkala Jan 26 '13 at 10:25
  • Instead of polling, the check can be made every time a task ends and an event can be raised when number hits 0. – S.D. Jan 26 '13 at 10:26
  • @thkala: Sorry but maybe I misunderstand. I see no wait() in the proposed solution. From what I can see none of the calls shown blocks. – Eric Jan 26 '13 at 10:30
  • The code, as written, is a spin-lock, will probably bring that core to its knees. In other words, don't do this. – nilskp Nov 09 '21 at 17:04
0

One of the suggested options in the answers you link to is to use a CompletionService

You could replace the busy waiting in your main thread with:

while (true) {
    Future<?> f = completionService.take(); //blocks until task completes
    if (executor.getQueue().isEmpty()
         && numTasks.longValue() == executor.getCompletedTaskCount()) break;
}

Note that getCompletedTaskCount only returns an approximate number so you might need to find a better exit condition.

assylias
  • 321,522
  • 82
  • 660
  • 783
  • Thanks but this would only solve part of the problem, wouldn't it? One would still wrap around the loop for every single task... – Eric Jan 26 '13 at 10:04
  • @Eric Not sure I understand what you mean. You submit all your tasks from the main thread then you run that loop and wait until it `break`s - that should only happen once there is no more task running in the executor. – assylias Jan 26 '13 at 10:11
  • Right. I agree that it should work. It's just not as nice as I hope it to be. There's still the need to loop over all futures, and to keep track of an explicit completion count. – Eric Jan 26 '13 at 10:17
  • @assylias: if you have to submit all tasks from a single thread, then you cannot have recursive task submission any more... – thkala Jan 26 '13 at 10:17
  • @thkala The initial tasks will be submitted from the main thread I suppose - whether those tasks submit new tasks or not does not change the logic and the idea still applies. – assylias Jan 26 '13 at 10:29
0

Java 7 has incorporated support for recursive tasks via its ForkJoinPool executor. It is quite simple to use and scales quite well, as long as the tasks themselves are not too trivial. Essentially it provides a controlled interface that allows tasks to wait for the completion of any sub-tasks without blocking the underlying thread indefinitely.

thkala
  • 84,049
  • 23
  • 157
  • 201
  • The problem is that with ForkJoinPool every completed task would have to wait for newly created tasks. While this would probably work, it would lead to memory issues, as completed tasks could not be discarded before *all* have completed (they are all waiting). In our system it can easily happen that we create millions of tasks. – Eric Jan 26 '13 at 10:19
0

If you know number of threads to wait and can paste one line of code to increase number for each thread with help of CountDownLatch ( http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.html ) It can resolve you problem

Dewfy
  • 23,277
  • 13
  • 73
  • 121
0

Since the last task doesn't know that it's the last, I actually don't think it's possible to have this work 100% correctly without recording both when tasks launch and when they complete.

If memory serves me right, the getQueue() method returns a queue containing only tasks that are still waiting to be executed, not ones that are currently running. Furthermore, getCompletedTaskCount() is approximate.

The solution I'm pondering goes something like this, using an atomic counter like in Eng.Fouad's answer and a Condition for signaling the main thread to wake up (pardon the shortcuts for simplicity):

public class MyThreadPoolExecutorState {

    public final Lock lock = new ReentrantLock();
    public final Condition workDone = lock.newCondition();
    public boolean workIsDone = false;

}

public class MyThreadPoolExecutor extends ThreadPoolExecutor {

    private final MyThreadPoolExecutorState state;
    private final AtomicInteger counter = new AtomicInteger(0);

    public MyThreadPoolExecutor(MyThreadPoolExecutorState state, ...) {
        super(...);
        this.state = state;
    }

    protected void beforeExecute(Thread t, Runnable r) {
        this.counter.incrementAndGet();
    }

    protected void afterExecute(Runnable r, Throwable t) {
        if(this.counter.decrementAndGet() == 0) {
            this.state.lock.lock();
            try {
                this.state.workIsDone = true;
                this.state.workDone.signal();
            }
            finally {
                this.state.lock.unlock();
            }
        }
    }

}

public class MyApp {

    public static void main(...) {

        MyThreadPoolExecutorState state = new MyThreadPoolExecutorState();
        MyThreadPoolExecutor executor = new MyThreadPoolExecutor(state, ...);

        // Fire ze missiles!
        executor.submit(...);

        state.lock.lock();
        try {
            while(state.workIsDone == false) {
                state.workDone.await();
            }
        }
        finally {
            state.lock.unlock();
        }

    }

}

It could be a little more elegant (maybe just provide a getState() in your thread pool executor or something?), but I think it should get the job done. It's also untested, so implement at your own peril...

It is worth noting that this solution will definitely fail if there are no tasks to be executed -- it'll await the signal indefinitely. So don't even bother starting the executor if you have no tasks to run.


Edit: On second thought, incrementing the atomic counter should happen upon submission, not immediately before task execution (because queuing could cause the counter to fall to 0 prematurely). It probably makes sense to override the submit(...) methods instead, and possibly also remove(...) and shutdown() (if you use them). The general idea remains the same, though. (But the more I think about it, the less pretty it is.)

I'd also check out the internals of the class to see if you can glean any knowledge from it: http://hg.openjdk.java.net/build-infra/jdk7/jdk/file/0f8da27a3ea3/src/share/classes/java/util/concurrent/ThreadPoolExecutor.java. The tryTerminate() method looks interesting.

impl
  • 783
  • 5
  • 14
  • it actually looks almost as if overwriting `terminated()` may just do the trick. One could `wait()` on a lock in the main thread which is then released using `notify` within `terminated()`. – Eric Jan 26 '13 at 18:42
  • I think using `terminated()` in combination with a semaphore should actually work... – Eric Jan 26 '13 at 19:21
  • Never mind; it turns out that, contrary to the JavaDoc, `terminated()` is only called if `shutDown()` or `terminate()` are called, not on normal completion. Too bad... – Eric Jan 26 '13 at 19:55
  • You should still be able to make it work with `afterExecute()` and a counter; that part of my example wouldn't change. – impl Jan 26 '13 at 20:24
0

You could use an atomic counter to count the submit (like has been said, before actually submitting). Combine this with a semaphore and release it in the afterExecute hook that a ThreadPoolExecutor provides. Instead of busy-waiting, call semaphore.acquire( counter.get()) after the first round of jobs has been submitted. But the number of acquires will be too small when calling acquire since the counter may increase later on. You would have to loop the acquire calls, with the increase since the last call as the argument, until the counter does not increase anymore.

Ralf H
  • 1,392
  • 1
  • 9
  • 17