30

I have encountered a problem twice now whereby a producer thread produces N work items, submits them to an ExecutorService and then needs to wait until all N items have been processed.

Caveats

  • N is not known in advance. If it were I would simply create a CountDownLatch and then have producer thread await() until all work was complete.
  • Using a CompletionService is inappropriate because although my producer thread needs to block (i.e. by calling take()) there's no way of signalling that all work is complete, to cause the producer thread to stop waiting.

My current favoured solution is to use an integer counter, and to increment this whenever an item of work is submitted and to decrement it when a work item is processed. Following the subsmission of all N tasks my producer thread will need to wait on a lock, checking whether counter == 0 whenever it is notified. The consumer thread(s) will need to notify the producer if it has decremented the counter and the new value is 0.

Is there a better approach to this problem or is there a suitable construct in java.util.concurrent I should be using rather than "rolling my own"?

Thanks in advance.

Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
Adamski
  • 54,009
  • 15
  • 113
  • 152
  • At what point in time does the producer know how many work items there are? When the last item has been produced? – Ronald Wildenberg Oct 28 '09 at 10:11
  • 1
    Your current solution may suffer from a race condition: produce item 1 -> counter++ -> process item 1 -> counter-- -> produce item 2. Since the counter has been decremented before the producer has produced the next item, the producer thinks he's ready. – Ronald Wildenberg Oct 28 '09 at 10:15
  • @rwwilden: You're correct in that this scenario could occur. However, my producer would only inspect / wait on the counter after submitting *all* work items and so it doesn't represent a race condition in this particular case. – Adamski Oct 28 '09 at 10:44

6 Answers6

32

java.util.concurrent.Phaser looks like it would work well for you. It is planned to be release in Java 7 but the most stable version can be found at jsr166's interest group website.

The phaser is a glorified Cyclic Barrier. You can register N number of parties and when youre ready await their advance at the specific phase.

A quick example on how it would work:

final Phaser phaser = new Phaser();

public Runnable getRunnable(){
    return new Runnable(){
        public void run(){
            ..do stuff...
            phaser.arriveAndDeregister();
        }
    };
}
public void doWork(){
    phaser.register();//register self
    for(int i=0 ; i < N; i++){
        phaser.register(); // register this task prior to execution 
        executor.submit( getRunnable());
    }
    phaser.arriveAndAwaitAdvance();
}
Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
John Vint
  • 39,695
  • 7
  • 78
  • 108
  • 3
    Cool - Thanks; This looks like exactly what I'm after ... Can't believe they called it Phaser though. – Adamski Oct 28 '09 at 13:06
  • 2
    If the point of `getRunnable()` is to represent a task that only signals the phaser and then ends, you want to invoke `phaser.arriveAndDeregister()` and **not** `phaser.arrive()` as otherwise when the parent task invokes `phaser.arriveAndAwaitAdvance()` a second time it will deadlock, as the task will be waiting for finished tasks that are still registered in the phaser. – Tiago Cogumbreiro Jun 06 '16 at 23:00
  • Excellent. Never heard of it before. – Ravindra babu May 18 '17 at 16:37
2

You could of course use a CountDownLatch protected by an AtomicReference so that your tasks get wrapped thus:

public class MyTask extends Runnable {
    private final Runnable r;
    public MyTask(Runnable r, AtomicReference<CountDownLatch> l) { this.r = r; }

    public void run() {
        r.run();
        while (l.get() == null) Thread.sleep(1000L); //handle Interrupted
        l.get().countDown();
    }
}

Notice that the tasks run their work and then spin until the count-down is set (i.e. the total number of tasks is know). As soon as the count-down is set, they count it down and exit. These get submitted as follows:

AtomicReference<CountDownLatch> l = new AtomicReference<CountDownLatch>();
executor.submit(new MyTask(r, l));

After the point of creation/submission of your work, when you know how many tasks you have created:

latch.set(new CountDownLatch(nTasks));
latch.get().await();
oxbow_lakes
  • 133,303
  • 56
  • 317
  • 449
  • Why does wrapping the CountDownLatch with AtomicReference help you in this case? The reference doesn't need to be protected, since it is passed to the MyTask in its constructor, and never changes after that. – Avi Oct 28 '09 at 10:13
  • Also, be sure to handle arbitrary throwables (RuntimeExceptions and Errors) in the run() method, at least by putting coundDown() in a finally{} block. – Avi Oct 28 '09 at 10:14
  • And of course, the question specifically was about the case when you don't know the number of tasks at the time of task creation. – Avi Oct 28 '09 at 10:15
  • @Avi - you must know `nTasks` at some point (as you also assume in your response). The atomic reference helps because the `CDL` has to safely set across multiple threads. My method *does work*, as does yours – oxbow_lakes Oct 28 '09 at 10:19
  • 1
    @oxbow_lakes: You are right - I didn't fully understand your code when I commented. I think the spinning could be avoided, by using wait() and notifyAll(). – Avi Oct 28 '09 at 10:33
  • I know it has been years, but should `MyTask` _implement_ `Runnable`? –  Jun 05 '18 at 14:25
1

I've used an ExecutorCompletionService for something like this:

ExecutorCompletionService executor = ...;
int count = 0;
while (...) {
    executor.submit(new Processor());
    count++;
}

//Now, pull the futures out of the queue:
for (int i = 0; i < count; i++) {
    executor.take().get();
}

This involves keeping a queue of tasks that have been submitted, so if your list is arbitrarily long, your method might be better.

But make sure to use an AtomicInteger for the coordination, so that you will be able to increment it in one thread, and decrement it in the worker threads.

Avi
  • 19,934
  • 4
  • 57
  • 70
  • How can you `awaitTermination`? What is shutting down your service? Also, you cannot re-use the completion service in your answer anyway, because you may be waiting on tasks that were logically part of *another set*. And (as mentioned in comments to my answer), you still need to know `nTasks` at some point – oxbow_lakes Oct 28 '09 at 10:21
  • @oxbow_lakes: True. That is why I only have it as an option (which I've edited out, since it doesn't help in this case). I'm assuming that this completion service will not be used for another set of tasks at the same time (overlapping) with this one. It can be used again later, from the same thread. – Avi Oct 28 '09 at 10:44
  • The advantage of this way of keeping track of the number of tasks rather than using an AtomicBoolean, is that you don't have to handle the wait() and notify() manually - the queue will take care of that. All you have to track is the number of items in the queue. – Avi Oct 28 '09 at 10:46
  • So in essence you introduced a second queue with task-ends that gets consumed by the producer. The consumer has all information to send the correct signal once the producer makes it known it sent all tasks. Imho the total count of tasks is an artifact that does not have to be modeled into the solution to this problem. – rsp Oct 28 '09 at 10:59
0

I assume your producer does not need to know when the queue is empty but needs to know when the last task has been completed.

I would add a waitforWorkDone(producer) method to the consumer. The producer can add its N tasks and call the wait method. The wait method blocks the incoming thread iff the work queue is not empty and no tasks are executing at the moment.

The consumer threads notifyAll() on the waitfor lock iff its task has been finished, the queue is empty and no other task is being executed.

rsp
  • 23,135
  • 6
  • 55
  • 69
0

What you described is quite similar to using a standard Semaphore but used 'backwards'.

  • Your semaphore starts with 0 permits
  • Each work unit releases one permit when it completes
  • You block by waiting to acquire N permits

Plus you have the flexibility to only acquire M < N permits which is useful if you want to check the intermediate state. For example I'm testing an asynchronous bounded message queue so I expect the queue to be full for some M < N so I can acquire M and check that the queue is indeed full then acquire the remaining N - M permits after consuming messages from the queue.

Mumrah
  • 392
  • 2
  • 9
0

Standalone Java 8+ method that does exactly this for a Stream using a single-pass Phaser. Iterator/Iterable variants are exactly the same.

public static <X> int submitAndWait(Stream<X> items, Consumer<X> handleItem, Executor executor) {
    Phaser phaser = new Phaser(1); // 1 = register ourselves
    items.forEach(item -> {
            phaser.register(); // new task
            executor.execute(() -> {
                handleItem.accept(item);
                phaser.arrive(); // completed task
            });
    });
    phaser.arriveAndAwaitAdvance(); // block until all tasks are complete
    return phaser.getRegisteredParties()-1; // number of items
}
...
int recognised = submitAndWait(facesInPicture, this::detectFace, executor)

FYI. This is fine for singular events but, if this is being called concurrently, a solution involving ForkJoinPool will stop the parent thread from blocking.

drekbour
  • 2,895
  • 18
  • 28