2

I adopted a the concurrency strategy from this post. However mine looks like this:

ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_CREATE_KNOWLEDGE_THREADS);
List<Callable<Collection<Triple>>> todo = new ArrayList<Callable<Collection<Triple>>>(this.patternMappingList.size());
for (PatternMapping mapping : this.patternMappingList ) {
    todo.add(new CreateKnowledgeCallable(mapping, i++));
}
try {

    List<Future<Collection<Triple>>> answers = executorService.invokeAll(todo);
    for (Future<Collection<Triple>> future : answers) {

        Collection<Triple> triples = future.get();
        this.writeNTriplesFile(triples);
    }    
}
catch (InterruptedException e) { ... }
catch (ExecutionException e) { ... }

executorService.shutdown();
executorService.shutdownNow();

But the ExecutorService never shuts down. I tried to debug how many of the CreateKnowledgeCallable are finished, but this number seems to vary (after no new threads/callables are executed but the service keeps running). I am sure a logged and printed every possible exception but I can't see one happening. It also seems that after a while nothing happens anymore except that NUMBER_OF_CREATE_KNOWLEDGE_THREADS cpus are spinning at 100% forever. What am I doing wrong? If you need to more specific infos I would be happy to provide them for you!

Kind regards, Daniel

Community
  • 1
  • 1
Daniel Gerber
  • 3,226
  • 3
  • 25
  • 32

4 Answers4

3

When you perform a shutdownNow() it interrupts all the threads in the pool. However, if your code ignores interrupts, they won't stop. You need to make your tasks honour interrupts with tests like

while(!Thread.currentThread.isInterrupted()) {

}

or

Thread.sleep(0);
Peter Lawrey
  • 525,659
  • 79
  • 751
  • 1,130
  • As I understood, shutdown() (which is called before shutdownNow()) lets all threads finish which are submitted to the executor service. I don't add threads after all of them should have finished. The funny thing though is, that this does not times out always. I just downsized the number of threads from 20 to 10 (32cores on the server) and the script runs to `this.writeNTriplesFile(triples);` but only for 105 of the 274 overall threads. – Daniel Gerber Dec 05 '11 at 11:23
  • 1
    By calling shutdown, it prevents new tasks and shutsdown when all tasks are finished. shutdownNow() returns all the unexecuted tasks. My guess is that a task is throwing an exception, you are not waiting for unexecuted tasks to finish so shutdownNow prevents the rest of the tasks from running. i.e. your try catch is outside the loop waiting for future results, instead of inside the loop. – Peter Lawrey Dec 05 '11 at 11:30
  • But shouldn't the program then quit, which does not? Also there are threads running at 100% cpu power which don't do any work whatsoever (lucene searches, solr index query). – Daniel Gerber Dec 05 '11 at 11:45
  • Also I would rather wait then use `shutdownNow` this was more kind of a debug/experimental line of code. – Daniel Gerber Dec 05 '11 at 11:47
  • sorry, but how do I paste code here? you mean `future.get()` should be surrounded with try/catch? One more question, as I understood `invokeAll()` wait's for all threads to finish. How could then the `future.get()` be interrupted since all threads are finished? – Daniel Gerber Dec 05 '11 at 12:07
  • Sorry, invokeAll does wait for all tasks to be done, unless the service is shutdown in another thread. – Peter Lawrey Dec 05 '11 at 12:15
  • You were right, I did forget to close the Lucene Index Searcher and then too many connections were opened, which led to an java.io.IOException that got swallowed somewhere. How could this happened? Could anybody tell me how to close this question? – Daniel Gerber Dec 05 '11 at 19:25
  • +1. This helped a lot !! Many thanks, @Peter. I made an `ExecutorService` and submitted a bunch of `SwingWorkers` to it. While they're executed, I called `ExecutorService#shutdownNow()` and the workers kept working until they finished. Constantly checking for interruptions fixed it. Thanks again :) . – Radu Murzea May 09 '13 at 14:55
  • @RaduMurzea You should be submitting Runnable or Callable. SwingWorkers are designed to work the Swing libraries. – Peter Lawrey May 10 '13 at 06:35
  • @Peter Aren't `SwingWorker`s Runnables ? `SwingWorker implements RunnableFuture`, where `RunnableFuture` is an interface that extends `Runnable` – Radu Murzea May 10 '13 at 06:51
  • @RaduMurzea While this is true, it could be confusing because you might assume this is will make a difference. i.e. Someone might think there is a reason you used a SwingWorker instead of Runnable when this doesn't do what it might appear to do. – Peter Lawrey May 10 '13 at 06:53
0
executorService.invokeAll

should return only when all tasks are finished. As well as future.get() Are you sure, that call to executorService.invokeAll(todo); ever returns and not blocks forever waiting for tasks to complete?

Nikem
  • 5,716
  • 3
  • 32
  • 59
0

are you sure that you submitted tasks actually finish? If you check the API for shutdownNow() and shutdown() you'll see that they do not guarantee termination.

Have you tried using a call to awaitTermination(long timeout, TimeUnit unit) with a reasonable amount of time as timeout parameter? (edit: "reasonable amount of time" depends of course on the mean process time of your tasks as well as the number of tasks executing at the time you call for termination)

Edit2: I hope the following example from my own code might help you out (note that it probably isn't the optimal, or most gracious, way to solve this problem)

try {

        this.started = true;
        pool.execute(new QueryingAction(pcqs));
        for(;;){
            MyObj p = bq.poll(timeout, TimeUnit.MINUTES); // poll from a blocking queue

            if(p != null){
                if (p.getId().equals("0"))
                    break;

                pool.submit(new AnalysisAction(ds, p, analyzedObjs));
            }else 
                drc.log("Timed out while waiting...");

        }

      } catch (Exception ex) {
          ex.printStackTrace();

      }finally{
          drc.log("--DEBUG: Termination criteria found, shutdown initiated..");
          pool.shutdown();

          int mins = 2;
          int nCores = poolSize -1 ;
          long  totalTasks = pool.getTaskCount(), 
                compTasks = pool.getCompletedTaskCount(),
                tasksRemaining = totalTasks - compTasks,
                timeout = mins * tasksRemaining / nCores;

          drc.log(  "--DEBUG: Shutdown commenced, thread pool will terminate once all objects are processed, " +
                    "or will timeout in : " + timeout + " minutes... \n" + compTasks + " of " +  (totalTasks -1) + 
                    " objects have been analyzed so far, " + "mean process time is: " +
                    drc.getMeanProcTimeAsString() + " milliseconds.");

          pool.awaitTermination(timeout, TimeUnit.MINUTES);
      }
posdef
  • 6,498
  • 11
  • 46
  • 94
  • what do you mean by "are you sure" is there a proper way to check for them to be finished in an interval? and no, i did not try àwaitTerminiation()` since I need all the answers. – Daniel Gerber Dec 05 '11 at 11:26
  • i tried to give an example that might portray what I mean, please see the edited answer above. – posdef Dec 05 '11 at 11:42
  • Thx @posdef I think I get your point. But I don't want to wait for a specified amount of time. I know that the code finishes eventually. I switched from a "do-everthing-by-myself-solution" to this executor service but now it does not work anymore. :( – Daniel Gerber Dec 05 '11 at 11:54
  • I used to have similar problems with my first shot at thread pools, but it eventually got sorted out with the above method. I am presuming that you implement a producet-consumer strategy, are you sure your termination criteria is fullfilled? sorry for the trivial questions but magical errors happen when one deals with concurrency so anything is possible really :) – posdef Dec 05 '11 at 12:48
0

Everyone with this sort of problems should try to implement the same algorithm without concurrency. With the help of this method, I found that a component has thrown a runtime exception which was swallowed.

Daniel Gerber
  • 3,226
  • 3
  • 25
  • 32