2

Fixtures

BiConsumer<Exception, Consumer<? super Integer>> NOTHING = (ex, unused) ->{/**/};

When I try to fix the bug that is reported by @Holger in this answer:

Stream<Integer> stream = Stream.of(1, 2, 3);

// v--- the bug I have already fixed, it will throws RuntimeException 
exceptionally(stream, NOTHING).collect(ArrayList::new, (l, x) -> {
    l.add(x);
    if (x < 4) throw new RuntimeException();
}, List::addAll);

Everything is ok but when using Stream.of(T) the map(...) operation will be invoked infinitely, for example:

List<Integer> result = exceptionally(
        //               v--- infinitely call
        Stream.of("bad").map(Integer::parseInt),
        NOTHING
).collect(toList());

But when I replace Stream.of(T) with Stream.of(T[]), it is works fine again, for example:

//            v--- return an empty list
List<Integer> result = exceptionally(
        Stream.of(new String[]{"bad"}).map(Integer::parseInt),
        NOTHING
).collect(toList());

The java.util.stream.Streams.StreamBuilderImpl#tryAdvance should be reset the count first, for example:

public boolean tryAdvance(Consumer<? super T> action) {
    Objects.requireNonNull(action);

    if (count == -2) {
        action.accept(first);
        count = -1;// <--- it should be call before `action.accept(first)`;
        return true;
    }
    else {
        return false;
    }
}

Q: It should be a bug in jdk, since it must keep the semantics consistent between the Stream.of methods. Am I right?

<T> Stream<T> exceptionally(Stream<T> source,
        BiConsumer<Exception, Consumer<? super T>> exceptionally) {
    
    class ExceptionallySpliterator extends AbstractSpliterator<T>
            implements Consumer<T> {

        private Spliterator<T> source;
        private T value;

        public ExceptionallySpliterator(Spliterator<T> source) {
            super(source.estimateSize(), source.characteristics());
            this.source = source;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            Boolean state = attempt(action);
            if (state == null) return true;
            if (state) action.accept(value);
            return state;
        }

        private Boolean attempt(Consumer<? super T> action) {
            try {
                return source.tryAdvance(this);
            } catch (Exception ex) {
                exceptionally.accept(ex, action);
                return null;
            }
        }

        @Override
        public void accept(T value) {
            this.value = value;
        }
    }

    return stream(
            new ExceptionallySpliterator(source.spliterator()), 
            source.isParallel()
    ).onClose(source::close);
}
Community
  • 1
  • 1
holi-java
  • 29,655
  • 7
  • 72
  • 83
  • and I found `Stream.of(T)` also can't run in jdk9. – holi-java Jul 22 '17 at 15:39
  • 1
    I just tested `Stream.of(T)` and `Stream.of(T[])` on my machine, and got the same result. Are you sure the issue isn't elsewhere? – Joe C Jul 22 '17 at 20:13
  • @JoeC hi, have you test the code above ? just test `Stream#of` can't produce the expected behavior, :) – holi-java Jul 22 '17 at 20:38
  • I've been able to observe that code is not the same when stream contains one element or several. When tested with bad values, in the first case an exception is thrown in `java.util.stream.Streams$StreamBuilderImpl.tryAdvance`while in the second case it is in `java.util.Spliterators$ArraySpliterator.tryAdvance`. Looks like a bug when exception in thrown in a stream of only one element (bad state management). – Jean-Baptiste Yunès Jul 22 '17 at 20:42
  • @Jean-BaptisteYunès thanks for your feedback, sir. I'm not sure it is a bug, so I want to know it is really a bug. Since it is not fixed in jdk9 as far as I know. – holi-java Jul 22 '17 at 20:47
  • Hard to tell, but it seems that in the case of stream of one element that throw exception, the buffer of objects is not correctly managed. I tried to trace what's inside the source spliterator and something is clearly wrong. – Jean-Baptiste Yunès Jul 22 '17 at 20:55
  • Not sure about your code logic, because when you try with "1","bad", then the result stream is 1, null. I suppose you wanted to drop exception cases, isn't it? – Jean-Baptiste Yunès Jul 22 '17 at 20:59
  • @Jean-BaptisteYunès yeah, the function can processing exceptions by `BiConsumer` handler. The handler can choose rethrow exception / provide a default value / skipping the invaild items. the code above is a skipping example. no `null`s in the result. – holi-java Jul 22 '17 at 21:04
  • provide a default value / skipping the invaild items seems incompatible to me has the control is not the same... – Jean-Baptiste Yunès Jul 22 '17 at 21:08
  • @Jean-BaptisteYunès I'm not good at English, sir. You can try `exceptionally` with differents consumers, then you will know what I said.please don't run it in parallel stream, since I have removed `trySplit` for printing. – holi-java Jul 22 '17 at 21:15
  • @holi-java Wouldn't `AbstractSpliterator` take care of `trySplit` with a default implementation? Maybe not the best possible implementation for parallelization, but I think it will suffice. – fps Jul 22 '17 at 22:57
  • @FedericoPeraltaSchaffner hi, take care of `trySplit` with a default implementation don't support my intention. – holi-java Jul 23 '17 at 00:46

1 Answers1

4

I wouldn’t call this a bug—not even an unexpected behavior, given the fact that I warned about such scenarios in this comment on the linked question a month ago:

Keep in mind that you don’t know whether the source iterator actually advanced its internal state or not when an exception has been thrown, so assuming that there is a next element can bring you into an infinite loop, repeating the failed operation over and over again.

Of course, for the Stream.of(singleElement) case that scenario is easy to avoid and changing the order of the two statements, action.accept(first); and count = -1;, would make the code more robust, still, being able to recover from an exception is not a guaranteed feature at all and there are other stream sources for which such recovery couldn’t get implemented that easily.

E.g., the streams returned by BufferedReader.lines(), resp. Files.lines() can’t force their underlying reader to advance one line if an IOException occurs.

Generally, this attempt to recover from an exception makes the premature assumption that an exception thrown by the source always indicates a problem with a particular element rather than a problem with the source in general. This works with the contrived example where you know that the exception is associated with a particular element because you provoked it. But this is not how you can deal with unexpected exceptions, which is what exceptions usually are, as when you expect a particular problem, like string elements not being in number format, you should handle them straight-forwardly, like filtering out the invalid strings before parsing.

Holger
  • 285,553
  • 42
  • 434
  • 765
  • thanks, but how to solve this scenario `BufferedReader.lines()` ,sir? `Stream.of(T)` I have already solved by analyzing the `stackTrace`. – holi-java Jul 24 '17 at 12:03
  • 2
    @holi-java: there is no way to recover from an exception from a data source, unless it has a documented recovery strategy. Generally, an exception may happen at an arbitrary place in the middle of an operation, leaving the data in an inconsistent state. You are only safe with custom exception types deliberately thrown at certain places. But `RuntimeException` is the supertype of a lot of exceptions that are thrown at unplanned places due to programming errors, etc. – Holger Jul 24 '17 at 12:10
  • sir, what you mean is the data-source should be retry/reconnect IO-stream and jump to the last failure position ? and stream can't control the data-source, exceptions must be controlled by data-source itself? – holi-java Jul 24 '17 at 12:12
  • 2
    Then you are still *before* the problematic position and about to repeat the failed operation, if the error is related to the particular element. And you can’t skip to the next element without doing the operation. You only know where the line ends, when you have read the line. And, as said, the error still can also be entirely independent of the element. If someone shut down the file server, there is no recovery… – Holger Jul 24 '17 at 12:16
  • 3
    @holi-java no, he means: give up, abandon this goal for it cannot be achieved. If you can recover from exceptions, this is because of the specific case of a particular 'bad' element. You can solve *that* much more straightforwardly by first invoking `filter()` and/or `map()` on the source stream instead. – user268396 Jul 24 '17 at 12:16
  • @Holger thanks, sir. In fact, when you explain the issues of `BufferedReader.lines()` in your question, I have already understand it. but I want to know if I provide some recover mode, can I make it works as expected? – holi-java Jul 24 '17 at 12:26
  • 2
    @holi-java: a data source itself may offer recovery for certain kind of failures, but usually, most I/O operations do not fall into the recoverable category. To name an example where it might work, [JDBC database drivers have such a category](https://docs.oracle.com/javase/8/docs/api/?java/sql/SQLRecoverableException.html) – Holger Jul 24 '17 at 12:39
  • @Holger thanks, sir. I think I should change the title to makes it more meaningful. how about it? – holi-java Jul 24 '17 at 12:42