7

I want to use a CompletionService to process the results from a series of threads as they are completed. I have the service in a loop to take the Future objects it provides as they become available, but I don't know the best way to determine when all the threads have completed (and thus to exit the loop):

import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

public class Bar {

    final static int MAX_THREADS = 4;
    final static int TOTAL_THREADS = 20;

    public static void main(String[] args) throws Exception{

        final ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(MAX_THREADS);
        final CompletionService<Integer> service = new ExecutorCompletionService<Integer>(threadPool);

        for (int i=0; i<TOTAL_THREADS; i++){
            service.submit(new MyCallable(i));
        }

        int finished = 0;
        Future<Integer> future = null;
        do{
            future = service.take();
            int result = future.get();
            System.out.println("  took: " + result);
            finished++;             

        }while(finished < TOTAL_THREADS);

        System.out.println("Shutting down");
        threadPool.shutdown();
    }


    public static class MyCallable implements Callable<Integer>{

        final int id;

        public MyCallable(int id){
            this.id = id;
            System.out.println("Submitting: " + id);
        }

        @Override
        public Integer call() throws Exception {
            Thread.sleep(1000);
            System.out.println("finished: " + id);
            return id;
        }
    }
}

I've tried checking the state of the ThreadPoolExecutor, but I know the getCompletedTaskCount and getTaskCount methods are only approximations and shouldn't be relied upon. Is there a better way to ensure that I've retrieved all the Futures from the CompletionService than counting them myself?


Edit: Both the link that Nobeh provided, and this link suggest that counting the number of tasks submitted, then calling take() that many times, is the way to go. I'm just surprised there isn't a way to ask the CompletionService or its Executor what's left to be returned.

Community
  • 1
  • 1
Ed Beaty
  • 415
  • 1
  • 6
  • 14

4 Answers4

7

See http://www.javaspecialists.eu/archive/Issue214.html for a decent suggestion on how to extend the ExecutorCompletionService to do what you're looking for. I've pasted the relevant code below for your convenience. The author also suggests making the service implement Iterable, which I think would be a good idea.

FWIW, I agree with you that this really should be part of the standard implementation, but alas, it's not.

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

public class CountingCompletionService<V> extends ExecutorCompletionService<V> {
  private final AtomicLong submittedTasks = new AtomicLong();
  private final AtomicLong completedTasks = new AtomicLong();

  public CountingCompletionService(Executor executor) {
    super(executor);
  }

  public CountingCompletionService(
      Executor executor, BlockingQueue<Future<V>> queue) {
    super(executor, queue);
  }

  public Future<V> submit(Callable<V> task) {
    Future<V> future = super.submit(task);
    submittedTasks.incrementAndGet();
    return future;
  }

  public Future<V> submit(Runnable task, V result) {
    Future<V> future = super.submit(task, result);
    submittedTasks.incrementAndGet();
    return future;
  }

  public Future<V> take() throws InterruptedException {
    Future<V> future = super.take();
    completedTasks.incrementAndGet();
    return future;
  }

  public Future<V> poll() {
    Future<V> future = super.poll();
    if (future != null) completedTasks.incrementAndGet();
    return future;
  }

  public Future<V> poll(long timeout, TimeUnit unit)
      throws InterruptedException {
    Future<V> future = super.poll(timeout, unit);
    if (future != null) completedTasks.incrementAndGet();
    return future;
  }

  public long getNumberOfCompletedTasks() {
    return completedTasks.get();
  }

  public long getNumberOfSubmittedTasks() {
    return submittedTasks.get();
  }

  public boolean hasUncompletedTasks() {
    return completedTasks.get() < submittedTasks.get();
  }
}
Mark
  • 1,788
  • 1
  • 22
  • 21
4

The code below is inspired by @Mark's answer, but I find it more convenient to use:

package com.example;

import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class CompletionIterator<T> implements Iterator<T>, AutoCloseable {

    private AtomicInteger count = new AtomicInteger(0);
    
    private CompletionService<T> completer;
    
    private ExecutorService executor = Executors.newWorkStealingPool(100);
    
    public CompletionIterator() {
        this.completer = new ExecutorCompletionService<>(executor);
    }

    public void submit(Callable<T> task) {
        completer.submit(task);
        count.incrementAndGet();
      }

    @Override
    public boolean hasNext() {
        return count.decrementAndGet() >= 0;
    }

    @Override
    public T next() {
        try {
            return completer.take().get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void close() {
        try {
            executor.shutdown();
            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
            executor = null;
            completer = null;
            count = null;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
    
}

This is how it can be used :

try(CompletionIterator service = new CompletionIterator()) {
  service.submit(task1);
  service.submit(task2);
  // all tasks must be submitted before iterating, to avoid race condition
  for (Future<Integer> future : service) {
    System.out.printf("Job %d is done%n", future.get());
  }
}
Alex R
  • 11,364
  • 15
  • 100
  • 180
3

Answering to these questions gives you the answer?

  • Do your asynchronous tasks create other tasks submitted to CompletionService?
  • Is service the only object that is supposed to handle the tasks created in your application?

Based on reference documentation, CompletionService acts upon a consumer/producer approach and takes advantage of an internal Executor. So, as long as, you produce the tasks in one place and consume them in another place, CompletionService.take() will denote if there are any more results to give out.

I believe this question also helps you.

Community
  • 1
  • 1
nobeh
  • 9,784
  • 10
  • 49
  • 66
  • Thanks, nobeh. It looks like they also just loop over the thread count, in their "for(int tasksHandled=0;tasksHandled – Ed Beaty Apr 03 '12 at 05:14
  • 1
    The example in the API uses the same approach of executing take() n times in a row. http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ExecutorCompletionService.html – Ed Beaty Apr 04 '12 at 19:20
  • If taking results from CompletionService in a different thread than tasks were submitted to the Exector, is that thread safe? – raffian Sep 23 '13 at 04:45
  • Thread safety, I suppose, is a direct gain when using concurrency API in `CompletionService` or `ExecutorService` because they simply _abstract_ and _encapsulate_ how threads are used internally. – nobeh Sep 23 '13 at 12:54
  • 3
    It is important to emphasize that you need to keep track of the number of jobs submitted to your CompletionService. The javadoc for `CompletionService.take()` implies it will just block until it receives another `Future`. This means unless you terminate the loop by using the count of threads submitted, you will block indefinitely. – Matt Lachman Feb 06 '14 at 14:25
2

My take based on Alex R' variant. Implying this will only be called in one thread, so no atomics just plain int counter

public class CompletionIterator<T> implements Iterable<T> {

    private int _count = 0;
    private final CompletionService<T> _completer;

    public CompletionIterator(ExecutorService executor) {
        this._completer = new ExecutorCompletionService<>(executor);
    }

    public void submit(Callable<T> task) {
        _completer.submit(task);
        _count++;
    }

    @Override
    public Iterator<T> iterator() {
        return new Iterator<T>() {

            @Override
            public boolean hasNext() {
                return _count > 0;
            }

            @Override
            public T next() {
                try {
                    T ret = _completer.take().get();
                    _count--;
                    return ret;
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            }

        };
    }

}
fedd
  • 880
  • 12
  • 39