8

Given a Stream and a method that returns a Stream for different arguments as data source, I'm looking for a way to merge the streams via flatMap(..) and catching certain Exceptions during the execution.

Let's take the following code snippet:

public class FlatMap {

    public static void main(final String[] args) {
        long count;

        // this might throw an exception
        count = Stream.of(0.2, 0.5, 0.99).flatMap(chance -> getGenerator(chance, 20)).count();

        // trying to catch the exception in flatMap() will not work
        count = Stream.of(0.2, 0.5, 0.99).flatMap(chance -> {
            try {
                return getGenerator(chance, 20);
            } catch (final NullPointerException e) {
                return Stream.empty();
            }
        }).count();

        System.out.println(count);
    }

    // !! we cannot change this method, we simply get a Stream
    static Stream<Object> getGenerator(final double chance, final long limit) {
        return Stream.generate(() -> {
            if (Math.random() < chance) return new Object();
            throw new NullPointerException();
        }).limit(limit);
    }
}

Is there any way to catch the exception of each individual Stream that was created by getGenerator(..) and simply suppress the Exception, replacing the "corrupted" Stream with an empty one or skip those elements from the specific generator Stream?

benez
  • 1,856
  • 22
  • 28
  • 1
    Possible duplicate of [Java 8: Lambda-Streams, Filter by Method with Exception](https://stackoverflow.com/questions/19757300/java-8-lambda-streams-filter-by-method-with-exception) – Hadi J Jan 08 '19 at 17:06
  • 1
    @HadiJ your linked questions deals with `Exception` thrown within lambdas passed to the `Stream` API. However the lambda in the `flatMap(..)` does not itself throw an `Exception`, but rather returns a `Stream` that might throw it. – benez Jan 08 '19 at 17:25
  • I personally find this a disturbing way to throw a `NullPointerException`. I think the exception is inappropriate for this situation. I suggest you pick a more descriptive one. Is there a reason that you cannot change the `getGenerator` method? – MC Emperor Jan 08 '19 at 18:29
  • 2
    @MCEmperor You are right, the code snipped is a very simplified version. I didn't want to distract with many application details. I thought this was clear when using plain `Object` and some random chances. If that is important, I can narrow down the true application code. – benez Jan 08 '19 at 18:39

3 Answers3

3

It is possible to wrap the Stream into another using the Spliterator. This method will protect a given Stream by catching the Exception and saving this state:

    static <T> Stream<T> protect(final Stream<T> stream) {
        final Spliterator<T> spliterator = stream.spliterator();
        return StreamSupport.stream(
                new Spliterators.AbstractSpliterator<T>(Long.MAX_VALUE,
                           spliterator.characteristics() & ~Spliterator.SIZED) {

                    private boolean corrupted = false;

                    @Override
                    public boolean tryAdvance(final Consumer<? super T> action) {
                        if (!corrupted) try {
                            return spliterator.tryAdvance(action);
                        } catch (final Exception e) {
                            // we suppress this one, stream ends here
                            corrupted = true;
                        }
                        return false;
                    }
                }, false);
    }

Then we can wrap our Stream method and safely pass it in flatMap(..):

// we protect the stream by a wrapper Stream
count = Stream.of(0.2, 0.5, 0.99)
              .flatMap(chance -> protect(getGenerator(chance, 20)))
              .count();
benez
  • 1,856
  • 22
  • 28
  • I guess you already know that this approach might not work as expected if the stream is parallel, don't you? You're not only "protecting" the sub streams, but also forcing them to be sequential... – fps Jan 08 '19 at 18:26
  • @FedericoPeraltaSchaffner You are going very deep into details. Basically with the current implementation, the `Stream` is allowed to fail multiple times in different concurrent `Threads`. So "as expected" is not really defined in this scope. But you are right, if we want to make sure the `Stream` will fail only on the first corrupted element, we need some synchronisation or convert the `Stream` into a sequencial one. – benez Jan 08 '19 at 18:32
  • I have upvoted your answer, it's very good. I had thought something similar, but was too lazy to post it (besides, spliterators always end up being particularly difficult to me). I think you could use an `AtomicBoolean` and atomically check it from within children spliterators, but I'm not sure... – fps Jan 08 '19 at 18:35
  • @FedericoPeraltaSchaffner The primitive boolean is enough in this case. There is no race condition. And since we don't know, when the `Exception` happens, we cannot call `tryAdvance(..)` in parallel in the original `Spliterator` within multiple `Threads`. Since there is no much room for code left, we basically invoke `tryAdvance(..)` sequencially, or we allow that two `Threads` may fail "at the same time". – benez Jan 08 '19 at 18:47
  • OK, benez, understood. Thanks for taking the time to explain. – fps Jan 08 '19 at 18:49
  • 1
    For a general-purpose solution, you should clear the `SIZED` characteristic from the source spliterator as you may provide less than reported elements when `corrupted` becomes `true`. It doesn’t matter in your specific case as for OpenJDK, `Stream.generate(…).limit(…)` doesn’t have the `SIZED` characteristic. – Holger Jan 09 '19 at 10:24
  • @Holger i've removed the size characteristics, thanks for the tip! – benez Jan 14 '19 at 10:10
2

One work around is to force the Stream created by getGenerator to be evaluated within the flatMap method implementation. This forces the NullPointerException to be thrown within the try-catch block, and therefore, able to be handled.

To do this, you can collect the Stream (to a List for example):

getGenerator(chance, 20).collect(Collectors.toList()).stream()

Incorporating this into your original snippet:

public class FlatMap {

    public static void main(final String[] args) {
        long count;

        // trying to catch the exception in flatMap() will not work
        count = Stream.of(0.2, 0.5, 0.99)
            .flatMap(chance -> {
                try {
                    return getGenerator(chance, 20).collect(Collectors.toList()).stream();
                } 
                catch (final NullPointerException e) {
                    return Stream.empty();
                }
            })
            .count();

        System.out.println(count);
    }

    // !! we cannot change this method, we simply get a Stream
    static Stream<Object> getGenerator(final double chance, final long limit) {
        return Stream.generate(() -> {
            if (Math.random() < chance) return new Object();
            throw new NullPointerException();
        }).limit(limit);
    }
}

Warning: this approach may reduce performance if the getGenerator Stream would be better to evaluate lazily.

Justin Albano
  • 3,809
  • 2
  • 24
  • 51
  • This would be an easy solution in case that performance is not an issue. Keep in mind that you call a terminal operation forcing any intermediate operations to be executed on the `Stream`, even if the terminal operation on the final `Stream` would only operate on a subset using `takeWhile(..)` or `anyMatch(..)`. – benez Jan 08 '19 at 18:11
0

Try this:

static <T> Supplier<T> getOrNull(Supplier<T> supplier) {
    return () -> {
        try {
            return supplier.get();
        } catch (Throwable e) {
            return null;
        }
    };
}

static Stream<Object> getGenerator(final double chance, final long limit) {
    return Stream.generate(
                      getOrNull(
                          () -> {
                              if (Math.random() < chance) return new Object();
                              throw new NullPointerException(); 
                              // You can throw any exception here
                          })) 
                .limit(limit)
                .filter(Objects::isNull);
}

Then simply call getGenerator:

count = Stream.of(0.2, 0.5, 0.99)
              .flatMap(chance -> getGenerator(chance, 20))
              .count();
ETO
  • 6,970
  • 1
  • 20
  • 37
  • That would do the trick, if you would have access to the inner methods, filters etc. of the `Stream`. however, you cannot touch the `getGenerator(..)` method. we assume it creates a `Stream` for us, ready to be used... i've added a comment to the code snippet – benez Jan 08 '19 at 17:37