This is a bug in JDK 9 - from issue #8193856:
takeWhile
is incorrectly assuming that an upstream operation supports and honors cancellation, which unfortunately is not the case for flatMap
.
Explanation
If the stream is ordered, takeWhile
should show the expected behavior. This is not entirely the case in your code because you use forEach
, which waives order. If you care about it, which you do in this example, you should use forEachOrdered
instead. Funny thing: That doesn't change anything.
So maybe the stream isn't ordered in the first place? (In that case the behavior is ok.) If you create a temporary variable for the stream created from strArray
and check whether it is ordered by executing the expression ((StatefulOp) stream).isOrdered();
at the breakpoint, you will find that it is indeed ordered:
String[][] strArray = {{"Sample1", "Sample2"}, {"Sample3", "Sample4", "Sample5"}};
Stream<String> stream = Arrays.stream(strArray)
.flatMap(indStream -> Arrays.stream(indStream))
.takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"));
// breakpoint here
System.out.println(stream);
That means that this is very likely an implementation error.
Into The Code
As others have suspected, I now also think that this might be connected to flatMap
being eager. More precisely, both problems might have the same root cause.
Looking into the source of WhileOps
, we can see these methods:
@Override
public void accept(T t) {
if (take = predicate.test(t)) {
downstream.accept(t);
}
}
@Override
public boolean cancellationRequested() {
return !take || downstream.cancellationRequested();
}
This code is used by takeWhile
to check for a given stream element t
whether the predicate
is fulfilled:
- If so, it passes the element on to the
downstream
operation, in this case System.out::println
.
- If not, it sets
take
to false, so when it is asked next time whether the pipeline should be canceled (i.e. it is done), it returns true
.
This covers the takeWhile
operation. The other thing you need to know is that forEachOrdered
leads to the terminal operation executing the method ReferencePipeline::forEachWithCancel
:
@Override
final boolean forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
boolean cancelled;
do { } while (
!(cancelled = sink.cancellationRequested())
&& spliterator.tryAdvance(sink));
return cancelled;
}
All this does is:
- check whether pipeline was canceled
- if not, advance the sink by one element
- stop if this was the last element
Looks promising, right?
Without flatMap
In the "good case" (without flatMap
; your second example) forEachWithCancel
directly operates on the WhileOp
as sink
and you can see how this plays out:
ReferencePipeline::forEachWithCancel
does its loop:
WhileOps::accept
is given each stream element
WhileOps::cancellationRequested
is queried after each element
- at some point
"Sample4"
fails the predicate and the stream is canceled
Yay!
With flatMap
In the "bad case" (with flatMap
; your first example), forEachWithCancel
operates on the flatMap
operation, though, , which simply calls forEachRemaining
on the ArraySpliterator
for {"Sample3", "Sample4", "Sample5"}
, which does this:
if ((a = array).length >= (hi = fence) &&
(i = index) >= 0 && i < (index = hi)) {
do { action.accept((T)a[i]); } while (++i < hi);
}
Ignoring all that hi
and fence
stuff, which is only used if the array processing is split for a parallel stream, this is a simple for
loop, which passes each element to the takeWhile
operation, but never checks whether it is cancelled. It will hence eagerly ply through all elements in that "substream" before stopping, likely even through the rest of the stream.