8

Consider the following code

public class TestCompletableFuture {

    BiConsumer<Integer, Throwable> biConsumer = (x,y) -> {
        System.out.println(x);
        System.out.println(y);
    };

    public static void main(String args[]) {
        TestCompletableFuture testF = new TestCompletableFuture();
        testF.start();      
    }

    public void start() {
        Supplier<Integer> numberSupplier = new Supplier<Integer>() {
            @Override
            public Integer get() {
                return SupplyNumbers.sendNumbers();                     
            }
        };
        CompletableFuture<Integer> testFuture = CompletableFuture.supplyAsync(numberSupplier).whenComplete(biConsumer);         
    }       
}

class SupplyNumbers {
    public static Integer sendNumbers(){
        return 25; // just for working sake its not  correct.
    }
}

The above thing works fine. However sendNumbers could also throw a checked exception in my case, like:

class SupplyNumbers {
    public static Integer sendNumbers() throws Exception {
        return 25; // just for working sake its not  correct.
    }
}

Now I want to handle this exception as y in my biConsumer. This will help me in handling the result as well as exception (if any) inside a single function (biConsumer).

Any ideas? Can I use CompletableFuture.exceptionally(fn) here or anything else?

Hearen
  • 7,420
  • 4
  • 53
  • 63
ayush
  • 14,350
  • 11
  • 53
  • 100
  • Here is a solution that allows you to use checked exceptions without reducing the readability of your code: https://stackoverflow.com/a/49705336/14731 – Gili Apr 07 '18 at 08:27

5 Answers5

16

The factory methods using the standard functional interfaces aren’t helpful when you want to handle checked exceptions. When you insert code catching the exception into the lambda expression, you have the problem that the catch clause needs the CompletableFuture instance to set the exception while the factory method needs the Supplier, chicken-and-egg.

You could use an instance field of a class to allow mutation after creation, but in the end, the resulting code isn’t clean and more complicated that a straight-forward Executor-based solution. The documentation of CompletableFuture says:

So you know the following code will show the standard behavior of CompletableFuture.supplyAsync(Supplier) while handling checked exceptions straight-forward:

CompletableFuture<Integer> f=new CompletableFuture<>();
ForkJoinPool.commonPool().submit(()-> {
  try { f.complete(SupplyNumbers.sendNumbers()); }
  catch(Exception ex) { f.completeExceptionally(ex); }
});

The documentation also says:

… To simplify monitoring, debugging, and tracking, all generated asynchronous tasks are instances of the marker interface CompletableFuture.AsynchronousCompletionTask.

If you want to adhere to this convention to make the solution even more behaving like the original supplyAsync method, change the code to:

CompletableFuture<Integer> f=new CompletableFuture<>();
ForkJoinPool.commonPool().submit(
  (Runnable&CompletableFuture.AsynchronousCompletionTask)()-> {
    try { f.complete(SupplyNumbers.sendNumbers()); }
    catch(Exception ex) { f.completeExceptionally(ex); }
});
Hearen
  • 7,420
  • 4
  • 53
  • 63
Holger
  • 285,553
  • 42
  • 434
  • 765
  • could you explain what does this means? (Runnable&CompletableFuture.AsynchronousCompletionTask) – Jasonw Nov 04 '15 at 13:14
  • 5
    @Jasonw: It’s a cast to an *intersection type*. In other words, the object must implement both types, `Runnable` *and* `CompletableFuture.AsynchronousCompletionTask`. Since type casts provide a context type for lambda expressions, this implies that the generated lambda instance will implement both interfaces. See also [here](http://stackoverflow.com/a/22808112/2711488) – Holger Nov 04 '15 at 13:19
7

You are already catching the exception in y. Maybe you did not see it because main exited before your CompletableFuture had a chance to complete?

The code below prints "null" and "Hello" as expected:

public static void main(String args[]) throws InterruptedException {
  TestCompletableFuture testF = new TestCompletableFuture();
  testF.start();
  Thread.sleep(1000); //wait for the CompletableFuture to complete
}

public static class TestCompletableFuture {
  BiConsumer<Integer, Throwable> biConsumer = (x, y) -> {
    System.out.println(x);
    System.out.println(y);
  };
  public void start() {
    CompletableFuture.supplyAsync(SupplyNumbers::sendNumbers)
            .whenComplete(biConsumer);
  }
}

static class SupplyNumbers {
  public static Integer sendNumbers() {
    throw new RuntimeException("Hello");
  }
}
assylias
  • 321,522
  • 82
  • 660
  • 783
3

I am not quite sure what you are trying to achieve. If your supplier throws an exception, when you call testFuture .get() you will get java.util.concurrent.ExecutionException caused by any exception that was thrown by the supplier, that you can retrieve by calling getCause() on ExecutionException.

Or, just as you mentioned, you can use exceptionally in the CompletableFuture. This code:

public class TestCompletableFuture {

    private static BiConsumer<Integer, Throwable> biConsumer = (x,y) -> {
        System.out.println(x);
        System.out.println(y);
    };

    public static void main(String args[]) throws Exception {
        Supplier<Integer> numberSupplier = () -> {
            throw new RuntimeException(); // or return integer
        };

        CompletableFuture<Integer> testFuture = CompletableFuture.supplyAsync(numberSupplier)
                .whenComplete(biConsumer)
                .exceptionally(exception -> 7);

        System.out.println("result = " + testFuture.get());
    }

}

Prints this result:

null
java.util.concurrent.CompletionException: java.lang.RuntimeException
result = 7

EDIT:

If you have checked exceptions, you can simply add a try-catch.

Original code:

Supplier<Integer> numberSupplier = new Supplier<Integer>() {
    @Override
    public Integer get() {
        return SupplyNumbers.sendNumbers();                     
    }
};

Modified code:

Supplier<Integer> numberSupplier = new Supplier<Integer>() {
    @Override
    public Integer get() {
        try {
            return SupplyNumbers.sendNumbers();                     
        } catch (Excetpion e) {
            throw new RuntimeExcetpion(e);
        }
    }
};
Jaroslaw Pawlak
  • 5,538
  • 7
  • 30
  • 57
  • 2
    OP was hoping to be able to handle the exception in the code that invokes the supplier, rather than within the supplier itself. If you are always going to handle the exception in a consistent way, it's clunky to be forced the try/catch in every supplier you write. – skelly Jun 07 '19 at 18:39
2

Perhaps you could use new Object to wrap your integer and error like this:

public class Result {

    private Integer   integer;
    private Exception exception;

    // getter setter

}

And then:

public void start(){
    Supplier<Result> numberSupplier = new Supplier<Result>() {
        @Override
        public Result get() {
            Result r = new Result();
            try {
                r.setInteger(SupplyNumbers.sendNumbers());
            } catch (Exception e){
                r.setException(e);
            }
            return r;

        }
    };
    CompletableFuture<Result> testFuture = CompletableFuture.supplyAsync(numberSupplier).whenComplete(biConsumer);
}
alex
  • 8,904
  • 6
  • 49
  • 75
2

Just wrap the checked exception into a CompletionException

Another point to take into account with exception handling in CompletableFuture when using completeExceptionally() is that the exact exception will be available in handle() and whenComplete() but it will be wrapped in CompletionException when calling join() or when it is forwarded to any downstream stage.

A handle() or exceptionally() applied to a downstream stage will thus see a CompletionException instead of the original one, and will have to look at its cause to find the original exception.

Moreover, any RuntimeException thrown by any operation (including supplyAsync()) is also wrapped in a CompletionException, except if it is already a CompletionException.

Considering this, it is better to play it on the safe side and have your exception handlers unwrap the CompletionExceptions.

If you do that, there is no point anymore to set the exact (checked) exception on the CompletableFuture and it is much simpler to wrap checked exceptions in CompletionException directly:

Supplier<Integer> numberSupplier = () -> {
    try {
        return SupplyNumbers.sendNumbers();
    } catch (Exception e) {
        throw new CompletionException(e);
    }
};

To compare this approach with Holger's approach, I adapted your code with the 2 solutions (simpleWrap() is the above, customWrap() is Holger's code):

public class TestCompletableFuture {

    public static void main(String args[]) {
        TestCompletableFuture testF = new TestCompletableFuture();
        System.out.println("Simple wrap");
        testF.handle(testF.simpleWrap());
        System.out.println("Custom wrap");
        testF.handle(testF.customWrap());
    }

    private void handle(CompletableFuture<Integer> future) {
        future.whenComplete((x1, y) -> {
            System.out.println("Before thenApply(): " + y);
        });
        future.thenApply(x -> x).whenComplete((x1, y) -> {
            System.out.println("After thenApply(): " + y);
        });
        try {
            future.join();
        } catch (Exception e) {
            System.out.println("Join threw " + e);
        }
        try {
            future.get();
        } catch (Exception e) {
            System.out.println("Get threw " + e);
        }
    }

    public CompletableFuture<Integer> simpleWrap() {
        Supplier<Integer> numberSupplier = () -> {
            try {
                return SupplyNumbers.sendNumbers();
            } catch (Exception e) {
                throw new CompletionException(e);
            }
        };
        return CompletableFuture.supplyAsync(numberSupplier);
    }

    public CompletableFuture<Integer> customWrap() {
        CompletableFuture<Integer> f = new CompletableFuture<>();
        ForkJoinPool.commonPool().submit(
                (Runnable & CompletableFuture.AsynchronousCompletionTask) () -> {
                    try {
                        f.complete(SupplyNumbers.sendNumbers());
                    } catch (Exception ex) {
                        f.completeExceptionally(ex);
                    }
                });
        return f;
    }
}

class SupplyNumbers {
    public static Integer sendNumbers() throws Exception {
        throw new Exception("test"); // just for working sake its not  correct.
    }
}

Output:

Simple wrap
After thenApply(): java.util.concurrent.CompletionException: java.lang.Exception: test
Before thenApply(): java.util.concurrent.CompletionException: java.lang.Exception: test
Join threw java.util.concurrent.CompletionException: java.lang.Exception: test
Get threw java.util.concurrent.ExecutionException: java.lang.Exception: test
Custom wrap
After thenApply(): java.util.concurrent.CompletionException: java.lang.Exception: test
Before thenApply(): java.lang.Exception: test
Join threw java.util.concurrent.CompletionException: java.lang.Exception: test
Get threw java.util.concurrent.ExecutionException: java.lang.Exception: test

As you'll notice, the only difference is that the whenComplete() sees the original exception before thenApply() in the customWrap() case. After thenApply(), and in all other cases, the original exception is wrapped.

The most surprising thing is that get() will unwrap the CompletionException in the "Simple wrap" case, and replace it with an ExecutionException.

Didier L
  • 18,905
  • 10
  • 61
  • 103
  • Throwing a `CompletionException` wrap is the key here, and makes everything run closer to the "`Future>` spirit"... – Matthieu Jan 20 '21 at 14:25