2

I'm having a hard time understanding how to handle exceptions when using a Java 8 stream. I would like to add objects to an empty list, and each object gets returned from a function that can potentially throw an exception depending on what the input is. If an exception is thrown for only certain values on the input list, I want it to continue to loop through the remainder of the input list. It seems easy to do with a for loop:

  List<Item> itemList = new ArrayList<>();
  List<Input> inputs = getInputs(); //returns a list of inputs
  int exceptionCount = 0;

   // If an exception is thrown
   for (Input input : inputs){
        try {
           itemList.add(getItem(input));
        } catch (Exception e ) {
         // handle exception from getItem(input)
          exceptionCount = exceptionCount + 1;
        }  
   }

It seems like it may be possible to achieve this with a Java stream, but I'm not quite sure how to handle the exception that could be thrown by the getItem() function. Here's what I have so far:

    final List<Item> itemList = new ArrayList<>();

     try {
         itemList = getInputs().stream()
                          .map(this::getItem)
                          .collect(Collectors.toList());
     } catch (Exception e ) {
         // handle exception from getItem(input)
          exceptionCount = exceptionCount + 1;
        } 

The above obviously won't work because as soon as there is one exception thrown from getItem, the loop will not continue and only one exception will be thrown for the entire stream. Is there any way I can achieve the same implementation as my basic for loop with Java 8 streams?

  • 1
    Out of interest... why do you insist on using a `Stream`? – Joe C Jul 21 '17 at 22:33
  • Yes you can by implementing BaseStream and Iterator and wrapping it all up yourself; however, it cannot be done efficiently because Java does not have coroutines. – hoodaticus Jul 21 '17 at 22:36
  • Yes. Streams break even around 10K items. So unless you are dealing with these sizes, there is no point in using them – bichito Jul 21 '17 at 22:36
  • @efekctive - I thought it was lambdas that break even at 10k. Same difference though, though of course you can use a stream without lambdas. – hoodaticus Jul 21 '17 at 22:37
  • Earlier this year a couple of SO members posted two really well researched posts and reached similar conclusions. Lambdas do no have a "size" as far as I know. It is the stream that handles them that carries the load. Around the February/March time frame – bichito Jul 21 '17 at 22:40
  • @efekctive do you have a link? that sounds really interesting! – MaxPower Jul 21 '17 at 22:51
  • you could extract the logic you trying to do in the stream into a separate method and in that method deal with the try/catch and exceptions. – MaxPower Jul 21 '17 at 22:53
  • let me browse... – bichito Jul 21 '17 at 22:57
  • https://stackoverflow.com/questions/42375003/why-lambda-intstream-anymatch-is-10-slower-than-naive-implementation#comment71904012_42375003 This is one of them. The other was posted shortly after – bichito Jul 21 '17 at 23:09
  • Please see here https://stackoverflow.com/questions/44616583/how-to-map-runtimeexceptions-in-java-streams-to-recover-from-invalid-stream-el/44620143 – holi-java Jul 21 '17 at 23:20

2 Answers2

4

You should catch the exception within the map operation:

class ExceptionCounter {
    private int count = 0;

    void inc() { count++; }

    int getCount() { return count; }
}

ExceptionCounter counter = new ExceptionCounter();

List<Item> items = getInputs().stream()
    .map(input -> {
        try {
            return getItem(input);
        } catch (Exception e) {
            // handle exception here
            counter.inc();
        }
    })
    .collect(Collectors.toList());

While this works as expected for sequential streams, it won't work for parallel streams. Even if the stream is not parallel, we still need the ExceptionCounter holder class because variables referenced from within arguments of stream operations (such as map) must be effectively final. (You can use an array of one element or an AtomicInteger instead of a holder class).

If we add synchronized to the inc method of ExceptionCounter class, then the solution above would support parallel streams. However, there would be a lot of thread contention on the inc method's lock, thus losing the advantages of parallelization. This (along with attempting to not create error-prone code) is the reason why side-effects are discouraged on streams. And counting the number of exceptions is in fact a side-effect.

For this particular case, you can avoid the side effect if you use a custom collector:

class Result<R> {
    List<R> values = new ArrayList<>();
    int exceptionCount = 0;
    // TODO getters
}

static <T, R> Result<R> mappingToListCountingExceptions(Function<T, R> mapper) {
    class Acc {
        Result<R> result = new Result<>();

        void add(T t) {
            try {
                R res = mapper.apply(t);
                result.value.add(res);
            } catch (Exception e) {
                result.exceptionCount++;
            }
        }

        Acc merge(Acc another) {
            result.values.addAll(another.values);
            result.exceptionCount += another.exceptionCount;
        }
    }
    return Collector.of(Acc::new, Acc::add, Acc::merge, acc -> acc.result);
}

You can use this custom collector as follows:

Result<Item> items = getInputs().stream()
    .collect(mappingToListCountingExceptions(this::getItem));

Now it's up to you to decide whether this approach is better than a traditional for loop.

fps
  • 33,623
  • 8
  • 55
  • 110
2

Event there are several ways to handle exceptions in the stream:

  1. to catch the exception in the map
files.stream()
   .parallel()
   .map(file-> {
       try {
           return file.getInputStream();
       } catch (IOException e) {
           e.printStackTrace();
           return null;
       }
   })
   .forEach(inputStream -> carService.saveCars(inputStream));

  1. to extract the function argument to map into a method of its own:
files.stream()
      .parallel()
      .map(file-> extractInputStream(file))
      .forEach(inputStream -> carService.saveCars(inputStream));

and

  private InputStream extractInputStream(MultipartFile file) {
        try {
            return file.getInputStream();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

  1. create another functional interface, similar to Function, whose apply method did declare that it throws an exception:
@FunctionalInterface
interface FunctionWithException<T, R, E extends Exception> {
  R apply(T t) throws E;
}

and

private <T, R, E extends Exception>
    Function<T, R> wrapper(FunctionWithException<T, R, E> fun) {
        return arg -> {
                        try {
                            return fun.apply(arg);
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
        };
}

then use it like this

files.stream()
     .parallel()
     .map(wrapper(file->file.getInputStream()))
     .forEach(inputStream -> carService.saveCars(inputStream));

  1. But if you want to do efficiently with all Functional interfaces, I suggest using this library
<dependency>
    <groupId>com.pivovarit</groupId>
    <artifactId>throwing-function</artifactId>
    <version>1.5.0</version>
</dependency>

All explained with an example in this post Java 8, How to handle exceptions in a stream?

Raouf Makhlouf
  • 193
  • 1
  • 8