0

I am trying to submit multiple tasks and obtain the results as and when it is available. However, after the end of the loop, I have to enforce that all the tasks complete within specified amount of time. If not, throw an error. Initially, all I had was executorService's invokeAll, shutdown and awaitTermination calls that were used to ensure that all tasks complete (inspite of errors or not). I migrated the code to use CompletionService to display the results. Where can I enforce the awaitTermination clause in the CompletionService calls?

 CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);
            logger.info("Submitting all tasks");
            for (Callable<String> task : tasks)
                completionService.submit(task);
            executor.shutdown();
            logger.info("Tasks submitted. Now checking the status.");
            while (!executor.isTerminated())
            {
                final Future<String> future = completionService.take();
                String itemValue;
                try
                {
                    itemValue = future.get();
                    if (!itemValue.equals("Bulk"))
                        logger.info("Backup completed for " + itemValue);
                }
                catch (InterruptedException | ExecutionException e)
                {
                    String message = e.getCause().getMessage();
                    String objName = "Bulk";
                    if (message.contains("(") && message.contains(")"))
                        objName = message.substring(message.indexOf("(") + 1, message.indexOf(")"));
                    logger.error("Failed retrieving the task status for " + objName, e);
                }
            }
executor.awaitTermination(24, TimeUnit.HOURS);

In other words, how can I utilize timeout for CompletionService?

EDIT:

The initial code I had was displayed below. The problem is that I am iterating through the future list and then printing them as completed. However, my requirement is to display the ones that were completed at a FCFS basis.

List<Future<String>> results = executor.invokeAll(tasks);
        executor.shutdown();
        executor.awaitTermination(24, TimeUnit.HOURS);

        while (results.size() > 0)
        {
            for (Iterator<Future<String>> iterator = results.iterator(); iterator.hasNext();)
            {
                Future<String> item = iterator.next();
                if (item.isDone())
                {
                    String itemValue;
                    try
                    {
                        itemValue = item.get();
                        if (!itemValue.equals("Bulk"))
                            logger.info("Backup completed for " + itemValue);
                    }
                    catch (InterruptedException | ExecutionException e)
                    {
                        String message = e.getCause().getMessage();
                        String objName = "Bulk";
                        if (message.contains("(") && message.contains(")"))
                            objName = message.substring(message.indexOf("(") + 1, message.indexOf(")"));
                        logger.error("Failed retrieving the task status for " + objName, e);
                    }
                    finally
                    {
                        iterator.remove();
                    }
                }
            }
        }
dmachop
  • 824
  • 1
  • 21
  • 39
  • But why? Looks like `CompletionService` is not appropriate for a problem you are trying to solve. What's wrong with `invokeAll`? – kan Oct 27 '15 at 20:40
  • Also, it's not clear why do you need `awaitTermination`. Just use `invokeAll`, as described here: http://stackoverflow.com/a/3269888/438742 – kan Oct 27 '15 at 20:44
  • To display which processes completed first on the log, I require CompletionService. If I use invokeAll, it blocks until all tasks are complete and I have to display the status in the order at which the tasks are submitted. – dmachop Oct 27 '15 at 20:51
  • I would think adding `executor.shutdownNow()` after the call to awaitTermination would do what you want. Any tasks not yet completed will be interrupted, so the CompletionService will see them all as immediately completed. Provided, of course, that your tasks properly handle interrupts. – VGR Oct 27 '15 at 20:54
  • @VGR I don't have a problem including `shutdownNow()` but I need to enforce a timeout constraint even when monitoring the tasks. Is there a way that I can tap the `awaitTermination`'s functionality or similar methods to include? Else, the only choice I have is to write a few modifications in the loop to check the time elapsed and throw exception inside the loop. Is this the only approach possible? – dmachop Oct 27 '15 at 20:58
  • I'm not sure I understand. Do you need individual timeouts for each task? Your (first block of) code currently waits 24 hours, so if shutdownNow is called after that, all the tasks will be terminated after that period. – VGR Oct 27 '15 at 21:05
  • @VGR That's not what I mean. I need timeout for the tasks in entirety and printing the results in FCFS fashion. My guess is that the awaitTermination is irrelevant due to the while loop above which checks for termination. – dmachop Oct 27 '15 at 21:32
  • Ah, I see. Xipo has the right idea, though I don't agree with his implementation. I would just do the awaitTermination and shutdownNow in a simple background Thread, without using any Executors. Start the Thread immediately after calling executor.shutdown(). – VGR Oct 27 '15 at 21:43

2 Answers2

1

I'd suggest you wait for the executor to terminate on another thread

That way you can achieve serving results FCFS and also enforce the timeout.

It can be easily achieved with something that will look like the following

CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);

// place all the work in a function (an Anonymous Runnable in this case)
// completionService.submit(() ->{work});
// as soon as the work is submitted it is handled by another Thread

completionService.submit(() ->{
    logger.info("Submitting all tasks");
    for (Callable<String> task : tasks)
    completionService.submit(task);
    logger.info("Tasks submitted. Now checking the status.");
    int counter = tasks.size();
    for(int i = counter; counter >=1; counter--)  // Replaced the while loop
    {
        final Future<String> future = completionService.take();
        String itemValue;
        try
        {
            itemValue = future.get();
            if (!itemValue.equals("Bulk"))
                logger.info("Backup completed for " + itemValue);
        }
        catch (InterruptedException | ExecutionException e)
        {
            String message = e.getCause().getMessage();
            String objName = "Bulk";
            if (message.contains("(") && message.contains(")"))
                objName = message.substring(message.indexOf("(") + 1, message.indexOf(")"));
            logger.error("Failed retrieving the task status for " + objName, e);
        }
    }
});

// After submitting the work to another Thread
// Wait in your Main Thread, and enforce termination if needed
shutdownAndAwaitTermination(executor);

You handle the executors termination && waiting using this (taken from ExecutorsService)

 void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(24, TimeUnit.HOURS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
     }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }
 }
Xipo
  • 1,765
  • 1
  • 16
  • 23
  • Your line: `while (!executor.isTerminated())` repeats the until the executor is terminated. If I'm not wrong, `awaitTermination()` is redundant here. – dmachop Oct 27 '15 at 21:37
  • @dmachop I just wanted to give you an idea of what i was talking about so i copied your code, didn't really work on it. Use whatever stopping condition you want.. maybe a counter of the tasks submitted. Also let us know if it worked for you – Xipo Oct 27 '15 at 21:47
  • So the only way is to start a timer after submitting the tasks and manually checking the condition in the while loop whether the timer has elapsed? – dmachop Oct 27 '15 at 21:50
  • I edited my answer to use a for loop AND NOT check for the termination in the loop. `shutdownAndAwaitTermination` will handle the termination of the threads – Xipo Oct 27 '15 at 22:16
  • Ah, now it makes sense. Thanks a lot. – dmachop Oct 28 '15 at 01:23
0

Ok then, you need to monitor completion. So, why are yon not using as per documentation? https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html So, it submits n tasks to a new instance of ExecutorCompletionService and waits n to complete. No termination again, you could just reuse the same executor (usually thread pool, creating a new thread is more expensive rather than reusing from a pool). So, if I adapt code from the documentation to your scenario it would be something like:

 CompletionService<Result> ecs
         = new ExecutorCompletionService<String>(executor);
 for (Callable<Result> task : tasks)
     ecs.submit(task);
 logger.info("Tasks submitted. Now checking the status.");
 int n = tasks.size();
 for (int i = 0; i < n; ++i) {
     try {
       String r = ecs.take().get();
       logger.info("Backup completed for " + r);
     }
     catch(InterruptedException | ExecutionException e) {
         ...
     }
 }

Also, it is bad idea to parse exception message, better if you create your custom exception class and use instanceof.

If you need to have a timeout for the completion - use poll with time parameters instead of take.

kan
  • 28,279
  • 7
  • 71
  • 101
  • Thanks. I got the notion to eliminate the while loop, use a for loop and then enforce timeout later as Xipo did. – dmachop Oct 28 '15 at 01:25