11

If I execute the following code which "concatenates" two streams

  • first by flatMapping a Stream<Stream<Integer>>
  • then by reducing a Stream<Stream<Integer>> using Stream.concat()

I obtain the same correct result in both cases, but the number of filtering operations is different.

public class FlatMapVsReduce {
    public static void main(String[] args) {
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);

        Predicate<Integer> predicate1 = i -> {
            System.out.println("testing first condition with " + i);
            return i == 3;
        };

        Predicate<Integer> predicate2 = i -> {
            System.out.println("testing second condition with " + i);
            return i == 7;
        };

        System.out.println("Testing with flatMap");
        Integer result1 =
            Stream.of(list.stream().filter(predicate1),
                      list.stream().filter(predicate2))
                  .flatMap(Function.identity())
                  .peek(i -> System.out.println("peeking " + i))
                  .findFirst()
                  .orElse(null);
        System.out.println("result1 = " + result1);

        System.out.println();
        System.out.println("Testing with reduce");
        Integer result2 =
            Stream.of(list.stream().filter(predicate1),
                      list.stream().filter(predicate2))
                  .reduce(Stream::concat)
                  .orElseGet(Stream::empty)
                  .peek(i -> System.out.println("peeking " + i))
                  .findFirst()
                  .orElse(null);
        System.out.println("result2 = " + result2);
    }
}

I get the expected result in both cases (3). However, the first operation applies the first filter on every element of the collection, whereas the second stops as soon as one is met. The output is:

Testing with flatMap
testing first condition with 1
testing first condition with 2
testing first condition with 3
peeking 3
testing first condition with 4
testing first condition with 5
testing first condition with 6
testing first condition with 7
testing first condition with 8
testing first condition with 9
result1 = 3

Testing with reduce
testing first condition with 1
testing first condition with 2
testing first condition with 3
peeking 3
result2 = 3

Why is there a difference of behavior between the two? Could the JDK code be improved to be as efficient in the first scenario than in the second one, or is there something in flatMap that makes it impossible?

Addendum: the following alternative is as efficient as the one using reduce, but I still can't explain why:

    Integer result3 = Stream.of(predicate1, predicate2)
                            .flatMap(c -> list.stream().filter(c).limit(1))
                            .peek(i -> System.out.println("peeking " + i))
                            .findFirst()
                            .orElse(null);
    System.out.println("result3 = " + result3);
Holger
  • 285,553
  • 42
  • 434
  • 765
JB Nizet
  • 678,734
  • 91
  • 1,224
  • 1,255
  • do you need `.orElseGet(Stream::empty)`? – njzk2 Jun 22 '16 at 18:21
  • 3
    Yes, because reduce() returns an Optional>, not a Stream. – JB Nizet Jun 22 '16 at 18:23
  • intuitively I would say that the parameter of the `flatMap` function is the complete first stream, which is why it is entirely consumed. (But I am unsure why that is not the case with reduce. Presumably stream.concat is smarter?) – njzk2 Jun 22 '16 at 18:26
  • What happens with an infinite first stream? – user2357112 Jun 22 '16 at 18:28
  • I would be extremely suspicious of the `Stream::concat` code. I would say you're not really intended to use `concat` like that. – Louis Wasserman Jun 22 '16 at 18:29
  • @user2357112 unsurprisingly, the first alternative never finds a result (infinite loop) with infinite streams, whereas the two other work fine. – JB Nizet Jun 22 '16 at 18:41
  • 1
    Digging into the source code seems to indicate that [`flatMap` calls `forEach` to completely consume each substream for some reason](http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/stream/ReferencePipeline.java#ReferencePipeline.flatMap%28java.util.function.Function%29). I'm pretty bad at navigating this code, so I'm not sure why it's doing that or whether I'm reading it correctly. – user2357112 Jun 22 '16 at 18:42
  • @LouisWasserman I wondered why there would be a problem with reducing with concat, given that the operation is associative. But then I read the javadoc and indeed found a "technical issue" warning: *Use caution when constructing streams from repeated concatenation. Accessing an element of a deeply concatenated stream can result in deep call chains, or even StackOverflowException.* I guess the last alternative is not only the most elegant one, but also the most robust one then. – JB Nizet Jun 22 '16 at 18:51
  • 2
    See [“Why filter() after flatMap() is “not completely” lazy in Java streams?”](http://stackoverflow.com/q/29229373/2711488) – Holger Jun 22 '16 at 18:55
  • @Holger thank you very much! I'll close this as a duplicate. Your answer explains everything, and also seems to answer my second question: it would be possible to keep the laziness, and this is a known limitation of the current implementation. – JB Nizet Jun 22 '16 at 19:01
  • 1
    Note that [here](http://stackoverflow.com/a/32767282/2711488) is a proof of concept showing that lazy `flatmap` is possible… – Holger Jun 22 '16 at 19:06

1 Answers1

5

From the implementation of flatMap in openJDK, what I understand is that flatMap pushes the whole content of the incoming stream downstream:

result.sequential().forEach(downstreamAsInt);

On the other hand, Stream::concat seems to be handling the pull and not sending everything at once.

I suspect that your test does not show the full picture:

  • In flatMap, the second stream is only considered when the first is depleted.
  • In reduce, all the streams are being pushed in the final concatenated stream, because the reduced object does not make sense until all the content of the input stream is consumed.

Which means using one or the other depends on how complex your inputs are. If you have an infinite Stream<Stream<Integer>>, reduce will never finish.

njzk2
  • 38,969
  • 7
  • 69
  • 107