Here's one weird trick you can use to improve your exception handling.
Let's say your mapper function is like this:
String doMap(Object obj) {
if (isInvalid(obj)) {
throw new IllegalArgumentException("info about obj");
} else {
return obj.toString();
}
}
This returns a result if the object is valid, but it throws an exception if the object is invalid. Unfortunately if you stick this directly into a pipeline, any error will stop the pipeline execution. What you want is something like an "either" type that can hold either a value or an error indicator (which would be a exception in Java).
Turns out that CompletableFuture
can hold either a value or an exception. Although it's intended for asynchronous processing -- which isn't occurring here -- we only have to contort it a little bit to use for our purposes.
First, given a stream
of objects to process, we call the mapping function wrapped in a call to supplyAsync
:
CompletableFuture<String>[] cfArray =
stream.map(obj -> CompletableFuture.supplyAsync(() -> doMap(obj), Runnable::run))
.toArray(n -> (CompletableFuture<String>[])new CompletableFuture<?>[n]);
(Unfortunately, the generic array creation gives an unchecked warning, which will have to be suppressed.)
The odd construct
CompletableFuture.supplyAsync(supplier, Runnable::run)
runs the supplier "asynchronously" on the provided Executor Runnable::run
, which simply runs the task immediately in this thread. In other words, it runs the supplier synchronously.
The trick is that the CompletableFuture
instance returned from this call contains either the value from the supplier, if it returned normally, or it contains an exception, if the supplier threw one. (I'm disregarding cancellation here.) We then gather the CompletableFuture
instances into an array. Why an array? It's setup for the next part:
CompletableFuture.allOf(cfArray).join();
This normally waits for the array of CFs to complete. Since they've been run synchronously, they should already all be complete. What's important for this case is that join()
will throw a CompletionException
if any of the CFs in the array has completed exceptionally. If the join completes normally, we can simply gather up the return values. If the join throws an exception, we can either propagate it, or we can catch it and process the exceptions stored in the CFs in the array. For example,
try {
CompletableFuture.allOf(cfArray).join();
// no errors
return Arrays.stream(cfArray)
.map(CompletableFuture::join)
.collect(toList());
} catch (CompletionException ce) {
long errcount =
Arrays.stream(cfArray)
.filter(CompletableFuture::isCompletedExceptionally)
.count();
System.out.println("errcount = " + errcount);
return Collections.emptyList();
}
If all are successful, this returns a list of the values. If there are any exceptions, this counts the number of exceptions and returns an empty list. Of course, you could easily do something else, like log the details of the exceptions, filter out the exceptions and return a list of valid values, etc.