48

If we use an ExecutorCompletionService we can submit a series of tasks as Callables and get the result interacting with the CompletionService as a queue.

But there is also the invokeAll of ExecutorService that accepts a Collection of tasks and we get a list of Future to retrieve the results.

As far as I can tell, there is no benefit in using one or over the other (except that we avoid a for loop using an invokeAll that we would have to submit the tasks to the CompletionService) and essentially they are the same idea with a slight difference.

So why are there 2 different ways to submit a series of tasks? Am I correct that performance wise they are equivalent? Is there a case that one is more suitable than the other? I can't think of one.

Basil Bourque
  • 303,325
  • 100
  • 852
  • 1,154
Cratylus
  • 52,998
  • 69
  • 209
  • 339

4 Answers4

94

Using a ExecutorCompletionService.poll/take, you are receiving the Futures as they finish, in completion order (more or less). Using ExecutorService.invokeAll, you do not have this power; you either block until are all completed, or you specify a timeout after which the incomplete are cancelled.


static class SleepingCallable implements Callable<String> {

  final String name;
  final long period;

  SleepingCallable(final String name, final long period) {
    this.name = name;
    this.period = period;
  }

  public String call() {
    try {
      Thread.sleep(period);
    } catch (InterruptedException ex) { }
    return name;
  }
}

Now, below I will demonstrate how invokeAll works:

final ExecutorService pool = Executors.newFixedThreadPool(2);
final List<? extends Callable<String>> callables = Arrays.asList(
    new SleepingCallable("quick", 500),
    new SleepingCallable("slow", 5000));
try {
  for (final Future<String> future : pool.invokeAll(callables)) {
    System.out.println(future.get());
  }
} catch (ExecutionException | InterruptedException ex) { }
pool.shutdown();

This produces the following output:

C:\dev\scrap>java CompletionExample
... after 5 s ...
quick
slow

Using CompletionService, we see a different output:

final ExecutorService pool = Executors.newFixedThreadPool(2);
final CompletionService<String> service = new ExecutorCompletionService<String>(pool);
final List<? extends Callable<String>> callables = Arrays.asList(
    new SleepingCallable("slow", 5000),
    new SleepingCallable("quick", 500));
for (final Callable<String> callable : callables) {
  service.submit(callable);
}
pool.shutdown();
try {
  Future<String> future;
  do {
    future = pool.isTerminated() ? service.poll() : service.take();
    if (future != null) {
      System.out.println(future.get());
    }
  } while (future != null);
} catch (ExecutionException | InterruptedException ex) { }

This produces the following output:

C:\dev\scrap>java CompletionExample
... after 500 ms ...
quick
... after 5 s ...
slow

Note the times are relative to program start, not the previous message.


Test out a working example on Replit.

obataku
  • 29,212
  • 3
  • 44
  • 57
  • 1
    So you are saying that in the `List` returned from `invokeAll` if start to iterate over the results, I could block on the first until it finishes, while in the `ExecutioncCompletion` I would block until any one result is available?Have I got your point? – Cratylus Aug 08 '12 at 20:32
  • +1 Yeah that's right @user384706. Underneath the `ExecutorCompletionService` is a `BlockingQueue>` so you can wait for the _first_ job to complete instead of all of them. – Gray Aug 08 '12 at 20:33
  • @user384706 well, using the non-timeout form returns the `Future`s after all have completed, blocking indefinitely. – obataku Aug 08 '12 at 20:36
  • @Gray:But in `invokeAll` I don't wait for all to complete either – Cratylus Aug 08 '12 at 20:39
  • @user384706 when you call `invokeAll` it waits for all of the them to complete before finishing. When `ExecutorCompletionService.poll/take` returns, you know that the resulting `get()` is not going to block. – Gray Aug 08 '12 at 20:43
  • @user384706 then perhaps you don't want `invokeAll`, unless you're OK with [cancelling unfinished tasks using a timeout](http://goo.gl/9gHG1). – obataku Aug 08 '12 at 20:43
  • @Gray actually I was incorrect... as it turns out, `ExecutorService.invokeAll` waits for completion. – obataku Aug 08 '12 at 20:45
  • @Gray:Ok, but it will block on `take` instead – Cratylus Aug 08 '12 at 21:02
  • I've added an answer with more details @user384706. Yes it blocks on `take()`. – Gray Aug 08 '12 at 21:03
  • Btw @veer. `take()` never returns `null`. – Gray Aug 08 '12 at 21:10
  • @Gray yes I know, but for some odd reason I felt compelled to put the assignment into the loop condition :p – obataku Aug 08 '12 at 21:13
  • 1
    Heh. I _never_ put the assignment in the loop condition. A pet peeve I guess. Good answer. :-) – Gray Aug 08 '12 at 21:16
  • @veer:I don't understand why in your first example (`invokeAll`) it prints `slow` first.I believe that I have read that the `Future` are returned in the same order are the `Iterator` of the passed `Collection` to `invokeAll` would return them.So I would expect to see `quick` first since `quick` is first in the passed `List` of the `invokeAll` – Cratylus Aug 09 '12 at 16:37
  • @user384706 good catch, I changed the order I submit them in without updating the output. It was to demonstrate that it waits for both to complete regardless of the order you put them in. I have since updated the output. Feel free to compile and run the full code from the paste. – obataku Aug 09 '12 at 19:21
  • @user384706 since you appear newer here, please don't forget to mark the answer [accepted](http://meta.stackexchange.com/questions/5234/how-does-accepting-an-answer-work) which helped most in solving the problem. – obataku Aug 15 '12 at 05:26
  • @veer:Good point. Thanks for ALL the great answers here by Gray and esaj as well – Cratylus Aug 15 '12 at 09:21
  • What do you mean by "in completion order _(more or less)_" ? – Mr_and_Mrs_D Jan 23 '14 at 23:18
  • @Mr_and_Mrs_D the order in which tasks complete i.e. if B finishes before A then the order would be B, A regardless of the order in which they're submit – obataku Jan 24 '14 at 22:02
  • @oldrinb: thanks - the _more or less_ part confused me - so they are guaranteed to return in the order they finish. You may want to edit this to make it more clear and let me know to delete the comments :) – Mr_and_Mrs_D Jan 25 '14 at 09:29
  • But the executor can be terminated before we take all futures. For example, two callables complete almost at the same time, the first will be taken, second - no, as the executor will be terminated at that point. I don't understand your `pool.shutdown()` + `!pool.isTerminated()` – Ivan Apr 27 '23 at 10:22
  • @Ivan [`shutdown`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html#shutdown--) refuses new tasks & initiates an _orderly_ shutdown but the pool is not [`terminated`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html#isTerminated--) until previously submitted tasks are completed – obataku May 05 '23 at 20:22
  • 1
    @obataku Yes, but tasks can be completed and the pool terminated before we call get on all futures in your example, so some results can be lost. Simulate longer processing after `future.get()`, in your example `Thread.sleep(4550)` and you never see "slow" – Ivan May 08 '23 at 08:50
  • 1
    @Ivan I see your point, thanks for noting; should be addressed now – obataku May 09 '23 at 06:12
  • @obataku current example doesn't compile around `while (future != null)` – Ivan May 09 '23 at 08:34
  • @Ivan corrected and updated with live replit link; I previously had introduced a `terminated` variable for the loop condition but refactored it away in the slackoverflow editor incorrectly – obataku May 09 '23 at 15:16
  • @Mr_and_Mrs_D reported completion order for concurrent tasks is somewhat inexact and subject to a race for tasks that complete at nearly the same time – obataku May 09 '23 at 15:22
  • Excellent explanation! – Alex Jul 27 '23 at 14:56
20

So why are there 2 different ways to submit a series of tasks? Am I correct that performance wise they are equivalent? Is there a case that one is more suitable than the other? I can't think of one.

By using an ExecutorCompletionService, you can get immediately notified when each of your jobs completes. In comparison, ExecutorService.invokeAll(...) waits for all of your jobs to complete before returning the collection of Futures. This means that (for example), if all but one job completes in 10 minutes but 1 job takes 30 minutes, you will get no results for 30 minutes.

// this waits until _all_ of the jobs complete
List<Future<Object>> futures = threadPool.invokeAll(...);

Instead, when you use a ExecutorCompletionService, you will be able to get the jobs as soon as each of them completes which allows you to (for example) send them on for processing into another thread pool, log results immediately, etc..

ExecutorService threadPool = Executors.newFixedThreadPool(2);
ExecutorCompletionService<Result> compService
      = new ExecutorCompletionService<Result>(threadPool);
for (MyJob job : jobs) {
    compService.submit(job);
}
// shutdown the pool but the jobs submitted continue to run
threadPool.shutdown();
while (true) {
    Future<Result> future;
    // if pool has terminated (all jobs finished after shutdown) then poll() else take()
    if (threadPool.isTerminated()) {
        future = compService.poll();
        if (future == null) {
            break;
        }
    } else {
        // the take() blocks until any of the jobs complete
        // this joins with the jobs in the order they _finish_
        future = compService.take();
    }
    // this get() won't block
    Result result = future.get();
    // you can then put the result in some other thread pool or something
    // to immediately start processing it
    someOtherThreadPool.submit(new SomeNewJob(result));
}
Gray
  • 115,027
  • 24
  • 293
  • 354
  • `while(!threadPool.isTerminated())` is not it a busy formal waiting? – Coder Apr 05 '16 at 17:36
  • It is but `take()` blocks so it isn't spinning. Did I answer your question @Sergio? – Gray Apr 05 '16 at 22:19
  • Yes thanks! I was digging on how to limit the blocking queue that there is inside `Executors.newFixedThreadPool`. In particular I am using the `ListenableFuture` – Coder Apr 05 '16 at 23:38
  • @Gray I did not understand your explanation of `while(!threadPool.isTerminated())`. Why is it needed? What purpose does it serve? – tinkuge Jan 14 '21 at 13:57
  • `isTerminate()` is true if the pool has shutdown and all of the jobs have completed. Is that what you are asking @tinkuge? – Gray Jan 14 '21 at 17:15
  • @Gray The edit makes it a lot more clearer, thank you! – tinkuge Jan 15 '21 at 02:53
3

I haven't ever actually used ExecutorCompletionService, but I think the case where this could be more useful than "normal" ExecutorService would be when you want to receive the Futures of completed tasks in completion order. With invokeAll, you just get a list that can contain a mix of incomplete and completed tasks at any given time.

esaj
  • 15,875
  • 5
  • 38
  • 52
1

Comparing by Considering only the Order of Results:

When we use CompletionService whenever a job submitted is finished the result will be pushed to the queue (Completion Order). Then the order of the submitted jobs and the returned results is no more same. So if you are concerned about the order which tasks got executed use CompletionService

Where As invokeAll returns a list of Futures representing the tasks, in the same sequential order as produced by the iterator for the given task list, each of which has completed.

Dungeon Hunter
  • 19,827
  • 13
  • 59
  • 82