0

I am using Java 8, and I want to know the recommended way to enforce timeout on 3 async jobs that I would to execute async and retrieve the result from the future. Note that the timeout is the same for all 3 jobs. I also want to cancel the job if it goes beyond time limit.

I am thinking something like this:

// Submit jobs async
List<CompletableFuture<String>> futures = submitJobs(); // Uses CompletableFuture.supplyAsync

List<CompletableFuture<Void>> all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

try {
    allFutures.get(100L, TimeUnit.MILLISECONDS);
} catch (TimeoutException e){
   for(CompletableFuture f : future) {
      if(!f.isDone()) {
         /*
         From Java Doc:
         @param mayInterruptIfRunning this value has no effect in this
             * implementation because interrupts are not used to control
             * processing.
         */
         f.cancel(true);
      }
   }
}

List<String> output = new ArrayList<>();
for(CompeletableFuture fu : futures) {
   if(!fu.isCancelled()) { // Is this needed?
      output.add(fu.join());
   }
}

return output;

  1. Will something like this work? Is there a better way?
  2. How to cancel the future properly? Java doc says, thread cannot be interrupted? So, if I were to cancel a future, and call join(), will I get the result immediately since the thread will not be interrupted?
  3. Is it recommended to use join() or get() to get the result after waiting is over?
no_clue_so
  • 71
  • 1
  • 6

2 Answers2

1

It is worth noting that calling cancel on CompletableFuture is effectively the same as calling completeExceptionally on the current stage. The cancellation will not impact prior stages. With that said:

  1. In principle, something like this will work assuming upstream cancellation is not necessary (from a pseudocode perspective, the above has syntax errors).
  2. CompletableFuture cancellation will not interrupt the current thread. Cancellation will cause all downstream stages to be triggered immediately with a CancellationException (will short circuit the execution flow).
  3. 'join' and 'get' are effectively the same in the case where the caller is willing to wait indefinitely. Join handles wrapping the checked Exceptions for you. If the caller wants to timeout, get will be needed.

Including a segment to illustrate the behavior on cancellation. Note how downstream processes will not be started, but upstream processes continue even after cancellation.

    public static void main(String[] args) throws Exception
    {
        int maxSleepTime = 1000;
        Random random = new Random();
        AtomicInteger value = new AtomicInteger();
        List<String> calculatedValues = new ArrayList<>();
        Supplier<String> process = () -> { try { Thread.sleep(random.nextInt(maxSleepTime)); System.out.println("Stage 1 Running!"); } catch (InterruptedException e) { e.printStackTrace(); } return Integer.toString(value.getAndIncrement()); };
        List<CompletableFuture<String>> stage1 = IntStream.range(0, 10).mapToObj(val -> CompletableFuture.supplyAsync(process)).collect(Collectors.toList());
        List<CompletableFuture<String>> stage2 = stage1.stream().map(Test::appendNumber).collect(Collectors.toList());
        List<CompletableFuture<String>> stage3 = stage2.stream().map(Test::printIfCancelled).collect(Collectors.toList());
        CompletableFuture<Void> awaitAll = CompletableFuture.allOf(stage2.toArray(new CompletableFuture[0]));
        try
        {
            /*Wait 1/2 the time, some should be complete. Some not complete -> TimeoutException*/
            awaitAll.get(maxSleepTime / 2, TimeUnit.MILLISECONDS);
        }
        catch(TimeoutException ex)
        {
            for(CompletableFuture<String> toCancel : stage2)
            {
                boolean irrelevantValue = false;
                if(!toCancel.isDone())
                    toCancel.cancel(irrelevantValue);
                else
                    calculatedValues.add(toCancel.join());
            }
        }
        System.out.println("All futures Cancelled! But some Stage 1's may still continue printing anyways.");
        System.out.println("Values returned as of cancellation: " + calculatedValues);
        Thread.sleep(maxSleepTime);
    }

    private static CompletableFuture<String> appendNumber(CompletableFuture<String> baseFuture) 
    {
        return baseFuture.thenApply(val -> {  System.out.println("Stage 2 Running"); return "#" + val; }); 
    }
    
    private static CompletableFuture<String> printIfCancelled(CompletableFuture<String> baseFuture) 
    { 
        return baseFuture.thenApply(val ->  { System.out.println("Stage 3 Running!"); return val; }).exceptionally(ex -> { System.out.println("Stage 3 Cancelled!"); return ex.getMessage(); }); 
    }

If it is necessary to cancel the upstream process (ex: cancel some network call), custom handling will be needed.

mike1234569
  • 636
  • 2
  • 5
  • Thanks for your response. Yes, the code is meant to be pseudocode. In the code that I provided, say, one of the jobs take 500 ms. Clearly, I call get with 100 ms timeout. In this case, a TimeoutException will be thrown, and you can see that I am going through the futures and cancelling ones that are not done yet. Since cancelling doesn't actually interrupt the thread (i.e. the job keeps running), I am therefore required to join on futures that have not been cancelled, right? I added a comment in the code ("Is this needed?"). Am I understanding this correctly? – no_clue_so Sep 19 '20 at 18:04
  • Attempting to join on futures that have been cancelled will cause an Exception to be thrown. Assuming you want the results that completed within the timeout, a check like this will be needed, though it shouldn't be necessary to iterate through the list 2 times. On the first iteration the completed futures can be added to a list. Added a sample above to illustrate this. – mike1234569 Sep 19 '20 at 18:36
0

After calling cancel you cannot join the furure, since you get an exception.

One way to terminate the computation is to let it have a reference to the future and check it periodically: if it was cancelled abort the computation from inside. This can be done if the computaion is a loop where at each iteration you can do the check.

Do you need it to be a CompletableFuture? Cause another way is to avoid to use a CompleatableFuture, and use a simple Future or a FutureTask instead: if you execute it with an Executor calling future.cancel(true) will terminate the computation if possbile.

Answerring to the question: "call join(), will I get the result immediately".

No you will not get it immediately, it will hang and wait to complete the computation: there is no way to force a computation that takes a long time to complete in a shorter time.

You can call future.complete(value) providing a value to be used as default result by other threads that have a reference to that future.

user1708042
  • 1,740
  • 17
  • 20
  • Regarding, "call join(), will I get the result immediately": I'm already calling get for 100 ms and cancelling all futures that haven't completed. Before calling join, I also check to make sure the future hasn't been cancelled, so I think the join will return immediately without any lag. – no_clue_so Sep 19 '20 at 19:28
  • I repeat: if you cancel and next call join you get and excteption, and you cannot get an asnwer faster: if the computtion takes 10 seconds to complete waiting 100 ms and next joining will still require 10 seconds to get the result. – user1708042 Sep 19 '20 at 19:43
  • I don't think you are understanding what I am saying. Let me give you an example: Job 1 -> 10ms Job 2 -> 200 ms Timeout set to 100 ms. My first get call will yield in successful completion of job 1 and cause timeout exception because of job 2. Then, I'm marking job 2 as cancelled. Next, I go through the jobs and call join (I don't call join on job 2 since it is cancelled). Therefore, I can expect the join to return immediately. [I don't care about jobs that are timed out]. – no_clue_so Sep 19 '20 at 19:52
  • in that case yes you get the result immediately from job 1 because your indirect logic is "not getting timeout means it is completed", but that is what is in the code, not what the sentence "So, if I were to cancel a future, and call join()" means. Doing so you use an exeption to implement a logic: in general a bad idea, and one that will possibly slow the execution. But yes this will temiante the uncomplete work. Are you using the fact this is also a CompletionStage? Cause this exception will propagate to other stages. If you don't use it I feel you don't need a completable future, – user1708042 Sep 19 '20 at 20:22
  • see also the answer to this: https://stackoverflow.com/questions/29013831/how-to-interrupt-underlying-execution-of-completablefuture – user1708042 Sep 19 '20 at 20:28