Fixtures
BiConsumer<Exception, Consumer<? super Integer>> NOTHING = (ex, unused) ->{/**/};
When I try to fix the bug that is reported by @Holger in this answer:
Stream<Integer> stream = Stream.of(1, 2, 3);
// v--- the bug I have already fixed, it will throws RuntimeException
exceptionally(stream, NOTHING).collect(ArrayList::new, (l, x) -> {
l.add(x);
if (x < 4) throw new RuntimeException();
}, List::addAll);
Everything is ok but when using Stream.of(T)
the map(...)
operation will be invoked infinitely, for example:
List<Integer> result = exceptionally(
// v--- infinitely call
Stream.of("bad").map(Integer::parseInt),
NOTHING
).collect(toList());
But when I replace Stream.of(T)
with Stream.of(T[])
, it is works fine again, for example:
// v--- return an empty list
List<Integer> result = exceptionally(
Stream.of(new String[]{"bad"}).map(Integer::parseInt),
NOTHING
).collect(toList());
The java.util.stream.Streams.StreamBuilderImpl#tryAdvance
should be reset the count
first, for example:
public boolean tryAdvance(Consumer<? super T> action) {
Objects.requireNonNull(action);
if (count == -2) {
action.accept(first);
count = -1;// <--- it should be call before `action.accept(first)`;
return true;
}
else {
return false;
}
}
Q: It should be a bug in jdk, since it must keep the semantics consistent between the Stream.of
methods. Am I right?
<T> Stream<T> exceptionally(Stream<T> source,
BiConsumer<Exception, Consumer<? super T>> exceptionally) {
class ExceptionallySpliterator extends AbstractSpliterator<T>
implements Consumer<T> {
private Spliterator<T> source;
private T value;
public ExceptionallySpliterator(Spliterator<T> source) {
super(source.estimateSize(), source.characteristics());
this.source = source;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
Boolean state = attempt(action);
if (state == null) return true;
if (state) action.accept(value);
return state;
}
private Boolean attempt(Consumer<? super T> action) {
try {
return source.tryAdvance(this);
} catch (Exception ex) {
exceptionally.accept(ex, action);
return null;
}
}
@Override
public void accept(T value) {
this.value = value;
}
}
return stream(
new ExceptionallySpliterator(source.spliterator()),
source.isParallel()
).onClose(source::close);
}