5

After the first question about understanding deeply java streams spliterators here, another subtle question about streams: Why the implementation of .flatMap() in Java is so inefficient (non lazy)?

Typically the streams should be as much lazy as possible, but .flatMap() method is not.

For example:

stream.flatMap(this::getStreamWith10HeavyComputationElems).firstFirst() will consume 10 elements (10 heavy computations) before returning the first heavy computation result.

stream.flatMap(this::getStreamWith10HeavyComputationElems).limit(11).count() will consume 20 elements (2x10 heavy computations) before returning 11.

The question is why Java uses a non lazy implementation?

    @Test
    void flatMap_native() throws Exception {
        AtomicInteger count = new AtomicInteger();
        Stream<Long> stream = LongStream.range(0, 5).boxed()
                .flatMap(num -> LongStream.range(0, 10).boxed()
                                    .peek(x -> count.incrementAndGet()))
                .limit(11);

        assertThat(stream).hasSize(11);
        assertThat(count).hasValue(20); //!why? - should be 11!
    }

As workaround I created my own implementation of flatMap, but it lacks fluency in comparison with the native call: flatMap(stream, mapper) vs native stream.flatMap(mapper).

public static <T, R> Stream<R> flatMap(Stream<? extends T> stream, Function<? super T, ? extends Stream<? extends R>> mapper) {
    // Outside the class to be able to close it, starts with stream.empty
    AtomicReference<Stream<? extends R>> flatMapStreamRef = new AtomicReference<>(Stream.empty());

    // Defining a better spliterator than the native flatMap one.
    class FlatMapSpliterator implements Spliterator<R> {
        private final AtomicReference<T> item = new AtomicReference<>();
        private final Spliterator<? extends T> spliterator;
        private Stream<? extends R> flatMapStream = flatMapStreamRef.get();
        private Spliterator<? extends R> flatMapSpliterator = flatMapStream.spliterator();

        private FlatMapSpliterator(Spliterator<? extends T> spliterator) {
            this.spliterator = spliterator;
        }

        @Override
        public boolean tryAdvance(Consumer<? super R> action) {
            while(true) {
                if (flatMapSpliterator.tryAdvance(action)) {
                    return true;
                }
                if (!spliterator.tryAdvance(item::set)) {
                    return false; // nothing more to process
                }
                Stream<? extends R> stream = mapper.apply(item.get());
                if(stream != null) {
                    flatMapStream.close();
                    flatMapStream = stream;
                    flatMapStreamRef.set(stream);
                    flatMapSpliterator = flatMapStream.spliterator();
                }
            }
        }

        @Override
        @SuppressWarnings("unchecked")
        public Spliterator<R> trySplit() {
            Spliterator<? extends R> subFlatMapSpliterator = flatMapSpliterator.trySplit();
            if(subFlatMapSpliterator != null) {
                return (Spliterator<R>) subFlatMapSpliterator;
            }

            Spliterator<? extends T> subSpliterator = spliterator.trySplit();
            if(subSpliterator == null) {
                return null;
            }

            return new FlatMapSpliterator(subSpliterator);
        }

        @Override
        public long estimateSize() {
            // If both estimate size are Long.MAX_VALUE then math overflow will happen
            long estimateSize = spliterator.estimateSize() + flatMapSpliterator.estimateSize();
            return estimateSize < 0 ? Long.MAX_VALUE : estimateSize;
        }

        @Override
        public int characteristics() {
            // Maintain only ORDERED (used by native flatMap)
            return spliterator.characteristics() & ORDERED;
        }
    }

    return StreamSupport.stream(new FlatMapSpliterator(stream.spliterator()), stream.isParallel())
            .onClose(stream::close)
            .onClose(flatMapStreamRef.get()::close);
}
Naman
  • 27,789
  • 26
  • 218
  • 353
Tet
  • 1,155
  • 10
  • 14
  • 3
    this is known and well a bit obvious already discussed here with a solution from @Holger... let me find it... – Eugene Oct 10 '17 at 07:48
  • 1
    @Eugene this? https://stackoverflow.com/questions/29229373/why-filter-after-flatmap-is-not-completely-lazy-in-java-streams – assylias Oct 10 '17 at 07:50
  • 2
    @assylias yes.. also this is related : https://stackoverflow.com/questions/32749148/in-java-how-do-i-efficiently-and-elegantly-stream-a-tree-nodes-descendants/32767282#32767282 – Eugene Oct 10 '17 at 07:51
  • @Eugene Great memory you have there ;-) ... my conclusion here: the current implementation of flatMap() is *buggy*, but it looks like nobody has/wants to work on that bug. – GhostCat Oct 10 '17 at 07:53
  • 1
    @GhostCat it might be a bit more complicated then that. It might *look* easier for us - but we don't know the entire context. – Eugene Oct 10 '17 at 07:54
  • @Eugene Sure. But inactive bugs are rarely a good sign. – GhostCat Oct 10 '17 at 07:55
  • 4
    Your solution ignores some aspects of the contract, e.g. the function passed to `flatMap` is allowed to return `null`, further, the substreams should be closed immediately after use. I also have my doubts about changing the characteristics in-between. The estimated size is bogus, but since bogus estimates are handled better than unknown sizes by OpenJDK’s implementation, that’s fine. Eugene has already linked to an answer containing an alternative spliterator-based solution. – Holger Oct 10 '17 at 08:20
  • Thanks @GhistCat and Eugene. What is the cause for not fixing the bug if it was discovered more than 2 and half years ago? Is any one form Oracle available to give us any hint? – Tet Oct 10 '17 at 08:23
  • 2
    2½years is not a big time for a bug. You will find twenty year old unsolved bugs in the data base. You would perhaps be surprised, how little development resources Java actually has, compared to its size. And the formal process ensuring that everything is checked for compatibility and sufficient documentation is not making it more efficient. We can’t judge the priorities in an objective way as we can’t see the other things the core developers are working at. But in principle, it’s an open source process, you could contribute to it… – Holger Oct 10 '17 at 08:50
  • Thank you @Holger. Not judging, just trying to understand. But this bug is really awkward because violates one the most important principles of streams. (Btw: thanks for letting me know that the mapper fn can return null, I didn't know it.) – Tet Oct 10 '17 at 08:58
  • Fixed the workaround with @Holger's recommendations. – Tet Oct 10 '17 at 20:43

0 Answers0