3

I have a series of different "tasks" to be done using the same thread pool. I want to measure the time it takes to perform each task, but for that I need to wait for every task in the "task" (sorry for ambiguity) to finish.

When there's just one task I would normally do this:

ExecutorService e = Executors.newCachedThreadPool();
for (int i=0; i<100; ++i)
    e.submit(target);
e.shutdown();
while (!e.isTerminated());

But since there will be several task submitted to the pool, I can't it down. All the methods that have something to do with waiting for the tasks to finish mention "after shutdown request". Well, what if I don't want to shut it down, but wait for all the threads to finish and then submit more tasks?

This is what I want to do:

ExecutorService e = Executors.newCachedThreadPool();
for (int i=0; i<100; ++i)
    e.submit(target);
// wait for all targets to finish
for (int i=0; i<100; ++i)
    e.submit(target); // submit different tasks
// wait... and so on

I thought of shutting the pool down and then "waking it up" again using prestartAllCoreThreads, but then I realized this was not an ExecutorService method but a ThreadPoolExecutor method. Could this be a solution? Shutting it down, waiting, and then activating the pool again? Seems a bit ugly to me.

I also thought that the most natural thing to do was to use a CyclicBarrier, but it seems too a specific way of doing this, while I think it would be the most logical thing to be able to use any ExecutorService for what I'm trying to do.

Is there any way I could stick to ExecutorServices and wait for all the tasks to finish?

dabadaba
  • 9,064
  • 21
  • 85
  • 155
  • 1
    did you try using a `CountDownLatch` inside your tasks and wait for it inside the main thread ? – omu_negru Jun 23 '14 at 09:50
  • No, I didn't, I have never worked with that class. – dabadaba Jun 23 '14 at 09:51
  • 1
    You initialize it with the number of tasks to run. You pass it as a constructor argument to the runnables and after the work is down you call countDown method on it. Inside the main thread you call wait on the latch and the thread will block until the latch is down to 0 – omu_negru Jun 23 '14 at 09:53
  • @omu_negru seemed like a great idea, but I've read the documentation and it says a `CountDownLatch` cannot be reseted. Then, what's the point of using this class in the particular situation I described? I could always declare new `CountDownLatch` objects, but that is kinda ugly... – dabadaba Jun 23 '14 at 09:58
  • Have a look at `CyclicBarrier` then. It provides the same functionality as the latch, but it can also be reseted – omu_negru Jun 23 '14 at 10:00
  • @omu_negru that's what I thought of, as I mentioned in my original post. But I see it as a way too especific solution for a situation that seems to be very general. I mean, is it that weird a situation where one needs to wait for all the tasks submitted to the pool and then submit new ones? But yes, I seems like it is. I guess I will have to use `CyclicBarrier` or do some dirty tricks. – dabadaba Jun 23 '14 at 10:02
  • Actually it's not at all such a clunky behavior. The advantage to the `CyclicBarrier` is that you can attach a `Runnable` to it that can then process all the data gathered from the finished tasks. You only have to create it once and just pass it as a reference to the constructors, so the modifications to the code are minimum – omu_negru Jun 23 '14 at 10:08
  • @omu_negru if you propose it as an answer I will accept it – dabadaba Jun 23 '14 at 10:15
  • Why not using [ExecutorService::invokeAll](http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html#invokeAll-java.util.Collection-)? – nosid Jun 23 '14 at 10:59
  • @nosid I am not familiar with that method, you could give an answer with your proposal. Also, java 7, please. – dabadaba Jun 23 '14 at 11:04
  • @omu_negru, CountdownLatch can't be reset, but it's trivially easy to create a new CountdownLatch for each batch of tasks that you want to measure. – Solomon Slow Jun 23 '14 at 13:37
  • @jameslarge could be easy but does not make much sense, I think `CyclicBarrier` fits the situation I presented in a more proper way. – dabadaba Jun 23 '14 at 13:53

3 Answers3

2

You can await the termination of that ExecutorService.

    ExecutorService executor = Executors.newCachedThreadPool();    
   //do your stuff

    try {
        executor.shutdown();
        executor.awaitTermination(5, TimeUnit.MINUTES);
    } catch (InterruptedException e) {
        //handle
    }

Or use a CountDownLatch:

CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}

try {
  latch.await();
} catch (InterruptedException E) {
   // handle
}

and within your task (enclose in try / finally)

latch.countDown();
Zarathustra
  • 2,853
  • 4
  • 33
  • 62
2

Use CyclicBarrier for the work you need like so :

// the optionalRunnable can collect the data gathered by the tasks
CyclicBarrier b = new CyclicBarrier(numberOfTasks,optionalRunnable)

Task yourTaks = new Task(...., b);
// inside the run method call b.await() after the work is done;
executor.submit(yourTaks);

Optionally , you can also call await in the main thread and instantiate the barrier to numTasks + 1 . That way you are sure you're resubmitting tasks to the executor only after it's done processing the current batch

omu_negru
  • 4,642
  • 4
  • 27
  • 38
  • I'm calling `await` after submitting each tasks, but the main threads block when it is called the first time. The argument for the `CyclicBarrier` is 101. – dabadaba Jun 23 '14 at 10:49
  • you should call await only after submitting all the tasks in the main thread. If you call it after the first one , then your program will block indefinitely. So in your case call b.await() in the main thread only after you exit the for loop – omu_negru Jun 23 '14 at 10:50
  • but if `await` is called just once there's still 100 threads to be added to the barrier for it to "open" and keep going. – dabadaba Jun 23 '14 at 10:53
  • you call await once in the main thread, then once inside each task after it finishes the work, so 1+100 = 101. After all tasks have called await , the barrier is broken and the main thread can proceed – omu_negru Jun 23 '14 at 10:56
  • so the `CyclicBarrier` should be shared by all the `Runnable` objects and passed as an argument in the constructor – dabadaba Jun 23 '14 at 10:58
  • 1
    Yes, just as the code says. You call await at the end of the run() or call() method. Also both CB and CL objects are thread safe – omu_negru Jun 23 '14 at 10:59
  • pointing out that they're thread safe was critical, since I was wrapping the call to `await` in a `synchronized` block which was ironically producing a block – dabadaba Jun 23 '14 at 11:07
0

You could create a TaskListener interface which you pass into each task. Each task notifies the TaskListener when they start and stop. Then you can create a TimingTaskListener implementation which maintains a ConcurrentMap of the durations which can be queried later.

public interface TaskListener {
   void onStart(String taskId);
   void onEnd(String taskId);
}

public class Task implements Runnable {
   private TaskListener taskListener;
   private String taskId;

   public Task(String taskId, TaskListener taskListener) {
      this.taskId = taskId;
      this.listener = listener;
   }

   public void run() {
      listner.onStart(taskId);
      try {
         doStuff();
      } finally {
         listener.onEnd(taskId);
      }
   }
}

// TODO: Implement TimingTaskListener to save durations to a ConcurrentMap
TimingTaskListener timingListener = new TimingTaskListener(); 
Runnable task1 = new Task("task1", timingListener);
Runnable task2 = new Task("task2", timingListener);

Future<?> f1 = e.submit(task1);
Future<?> f2 = e.submit(task2);

// futures block until the task is finished.
// You could also use a CountDownLatch to achieve the same
f1.get();
f2.get();

long time1 = timingListener.getDuration("task1");
long time2 = timingListener.getDuration("task2");
lance-java
  • 25,497
  • 4
  • 59
  • 101