6

I am looking for a way to execute batches of tasks in java. The idea is to have an ExecutorService based on a thread pool that will allow me to spread a set of Callable among different threads from a main thread. This class should provide a waitForCompletion method that will put the main thread to sleep until all tasks are executed. Then the main thread should be awaken, and it will perform some operations and resubmit a set of tasks.

This process will be repeated numerous times, so I would like to use ExecutorService.shutdown as this would require to create multiple instances of ExecutorService.

Currently I have implemented it in the following way using a AtomicInteger, and a Lock/Condition:

public class BatchThreadPoolExecutor extends ThreadPoolExecutor {
  private final AtomicInteger mActiveCount;
  private final Lock          mLock;
  private final Condition     mCondition;

  public <C extends Callable<V>, V> Map<C, Future<V>> submitBatch(Collection<C> batch){
    ...
    for(C task : batch){
      submit(task);
      mActiveCount.incrementAndGet();
    }
  }

  @Override
  protected void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    mLock.lock();
    if (mActiveCount.decrementAndGet() == 0) {
      mCondition.signalAll();
    }
    mLock.unlock();
  }

  public void awaitBatchCompletion() throws InterruptedException {
    ...
    // Lock and wait until there is no active task
    mLock.lock();
    while (mActiveCount.get() > 0) {
      try {
        mCondition.await();
      } catch (InterruptedException e) {
        mLock.unlock();
        throw e;
      }
    }
    mLock.unlock();
  } 
}

Please not that I will not necessarily submit all the tasks from the batch at once, therefore CountDownLatch does not seem to be an option.

Is this a valid way to do it? Is there a more efficient/elegant way to implement that?

Thanks

Victor P.
  • 630
  • 1
  • 6
  • 14
  • Can you explain a bit more why the default executors can't handle your use case? Why do you need to extend `ThreadPoolExecutor`? – Gray Apr 24 '12 at 12:49
  • Well the API do not specify a method to wait on the completion of all submitted tasks unless you call `shutdown` first. In my case I do not want to shutdown the executor as I will need it almost immediately after, and that would lead to useless thread creations. Does it answer your question? – Victor P. Apr 24 '12 at 13:00
  • 1
    See this question: http://stackoverflow.com/questions/3269445/executorservice-how-to-wait-for-all-tasks-to-finish/3269888 – andersoj Apr 24 '12 at 13:35

3 Answers3

8

I think the ExecutorService itself will be able to perform your requirements.

Call invokeAll([...]) and iterate over all of your Tasks. All Tasks are finished, if you can iterate through all Futures.

Christian Kuetbach
  • 15,850
  • 5
  • 43
  • 79
  • This was my first implementation, the problem is that the `main` thread might be _interupted_ while submitting its tasks (meaning that something will `break` the execution of the loop), therefore I cannot rely on `invokeAll`. I could wait on the `Future.get` externally but I thought it was better in terms of design to have the executor responsible for that. I may be wrong though ;) – Victor P. Apr 24 '12 at 13:08
  • I think this solution is the cleanest. `main` thread might equally well be interrupted while sleeping - are you actually planning to interrupt it, or has this just arisen because `InterruptedException` is checked? – artbristol Apr 24 '12 at 13:30
  • I did not see that `invokeAll` internally waits for all tasks to complete, this may be the cleanest solution, I will refactor the code related to the _interruption_ of the main thread (I am not talking about a call to `Thread.interrupt`, just a conditional `beak` in the loop that is creating tasks) – Victor P. Apr 24 '12 at 13:43
3

As the other answers point out, there doesn't seem to be any part of your use case that requires a custom ExecutorService.

It seems to me that all you need to do is submit a batch, wait for them all to finish while ignoring interrupts on the main thread, then submit another batch perhaps based on the results of the first batch. I believe this is just a matter of:

    ExecutorService service = ...;

    Collection<Future> futures = new HashSet<Future>();
    for (Callable callable : tasks) {
        Future future = service.submit(callable);
        futures.add(future);
    }

    for(Future future : futures) {
        try {
            future.get();
        } catch (InterruptedException e) {
            // Figure out if the interruption means we should stop.
        }
    }

    // Use the results of futures to figure out a new batch of tasks.
    // Repeat the process with the same ExecutorService.
sharakan
  • 6,821
  • 1
  • 34
  • 61
  • I'll follow your advice and fall back to what was my previous implementation. @ckuetbach you have the credit for this solution – Victor P. Apr 24 '12 at 13:37
0

I agree with @ckuetbach that the default Java Executors should provide you with all of the functionality you need to execute a "batch" of jobs.

If I were you I would just submit a bunch of jobs, wait for them to finish with the ExecutorService.awaitTermination() and then just start up a new ExecutorService. Doing this to save on "thread creations" is premature optimization unless you are doing this 100s of times a second or something.

If you really are stuck on using the same ExecutorService for each of the batches then you can allocate a ThreadPoolExecutor yourself, and be in a loop looking at ThreadPoolExecutor.getActiveCount(). Something like:

BlockingQueue jobQueue = new LinkedBlockingQueue<Runnable>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS,
    0L, TimeUnit.MILLISECONDS, jobQueue);
// submit your batch of jobs ...
// need to wait a bit for the jobs to start
Thread.sleep(100);
while (executor.getActiveCount() > 0 && jobQueue.size() > 0) {
    // to slow the spin
    Thread.sleep(1000);
}
// continue on to submit the next batch
Gray
  • 115,027
  • 24
  • 293
  • 354
  • 1
    Creating a new Executor each time would lead to approximately 240 new threads in 30s, I just feel bad about it :) Regarding the `executor.getActiveCount()` API says its only an approximate count, and the `Thread.sleep` is not a good option for me as I want to be as fast as possible: I am implementing a combinatorial optimization algorithm and the two performance metrics are solution quality and speed, each ms counts! – Victor P. Apr 24 '12 at 13:17
  • Don't be. 240 new threads in 30 seconds is NOTHING. Try a for loop sometime that creates and destroys threads. See how many you can do in 30 seconds. – Gray Apr 24 '12 at 13:20
  • Given your speed requirements, I'd drop the custom code and just create a new executor each time. You won't regret it. – Gray Apr 24 '12 at 13:21
  • Ok I'll try your suggestion, I though it was bad practice to create threads and terminate them almost instantly. Thanks – Victor P. Apr 24 '12 at 13:24