15

Suppose I have this code:

 Collections.singletonList(10)
            .parallelStream() // .stream() - nothing changes
            .flatMap(x -> Stream.iterate(0, i -> i + 1)
                    .limit(x)
                    .parallel()
                    .peek(m -> {
                        System.out.println(Thread.currentThread().getName());
                    }))
            .collect(Collectors.toSet());

Output is the same thread name, so there is no benefit from parallel here - what I mean by that is that there is a single thread that does all the work.

Inside flatMap there is this code:

result.sequential().forEach(downstream);

I understand forcing the sequential property if the "outer" stream would be parallel (they could probably block), "outer" would have to wait for "flatMap" to finish and the other way around (since the same common pool is used) But why always force that?

Is that one of those things that could change in a later version?

giannis christofakis
  • 8,201
  • 4
  • 54
  • 65
Eugene
  • 117,005
  • 15
  • 201
  • 306

2 Answers2

16

There are two different aspects.

First, there is only a single pipeline which is either sequential or parallel. The choice of sequential or parallel at the inner stream is irrelevant. Note that the downstream consumer you see in the cited code snippet represents the entire subsequent stream pipeline, so in your code, ending with .collect(Collectors.toSet());, this consumer will eventually add the resulting elements to a single Set instance which is not thread safe. So processing the inner stream in parallel with that single consumer would break the entire operation.

If an outer stream gets split, that cited code might get invoked concurrently with different consumers adding to different sets. Each of these calls would process a different element of the outer stream mapping to a different inner stream instance. Since your outer stream consists of a single element only, it can’t be split.

The way, this has been implemented, is also the reason for the Why filter() after flatMap() is “not completely” lazy in Java streams? issue, as forEach is called on the inner stream which will pass all elements to the downstream consumer. As demonstrated by this answer, an alternative implementation, supporting laziness and substream splitting, is possible. But this is a fundamentally different way of implementing it. The current design of the Stream implementation mostly works by consumer composition, so in the end, the source spliterator (and those split off from it) receives a Consumer representing the entire stream pipeline in either tryAdvance or forEachRemaining. In contrast, the solution of the linked answer does spliterator composition, producing a new Spliterator delegating to source spliterators. I supposed, both approaches have advantages and I’m not sure, how much the OpenJDK implementation would lose when working the other way round.

Holger
  • 285,553
  • 42
  • 434
  • 765
  • Hi, sir. is it should be a stream bug? – holi-java Jul 11 '17 at 16:28
  • 1
    @holi-java I wouldn't say that this is a bug, just poor implementation design that will most likely be fixed in the future. – Jacob G. Jul 11 '17 at 16:29
  • 6
    @holi-java: the missing laziness can be seen as a bug and there is already a bug report for it. The limited parallelness, however, is just an area for potential performance improvement. In practice, this only affects streams with a small number of elements in the outer stream and much larger inner streams. – Holger Jul 11 '17 at 16:31
  • @JacobG. Hi, but `flatMap` inconsistent with the documentation. – holi-java Jul 11 '17 at 16:31
  • @Holger thanks to let's know more about the stream api, and I have seen your another interested answer about `flatMap` infinitely, also up-voted yours. thanks again. ):, please forgive my bad english, :). – holi-java Jul 11 '17 at 16:33
  • 1
    @Holger I am not looking into a solution at the moment - just pure interest. as usually highly interesting read from you. – Eugene Jul 12 '17 at 12:24
  • @Holger, > this only affects streams with a small number of elements in the outer stream and much larger inner streams ...................................... Which is exactly the case I have right now with java8 streams. And based on my experience with rxjava/reactor, this is pretty common usecase. Typical case: outer stream are users, and inner stream are pieces of information you need to fetch for this user (either by db call, or api call, doesn't matter). And parallelization here may drastically impact the result - whether you meet SLA or not. Imho, this is a very common usecase. – Dmytro Buryak Feb 26 '21 at 12:58
  • 1
    @DmytroBuryak you may try the solution in the linked answer. – Holger Feb 26 '21 at 13:49
  • @Holger, yep, I've seen your solution, thank you! Actually, I already solved it in a different way, committed and already gave it for review, before I left a comment here. (you can see my 2-lines solution in answers for this question, if you're curious). But still I wouldn't take your solution for such a simple problem. Because of two reasons: 1) it's complex and I don't understand it at a glance, 2) if I used it, I would have ~50 more lines of complex code that I (and eventually QA) need to test. My main point is not about exact solution, but rather about this usecase. It's very common – Dmytro Buryak Feb 27 '21 at 21:29
  • 2
    @DmytroBuryak I still hope for a built-in solution… – Holger Mar 01 '21 at 07:44
4

For anyone like me, who has a dire need to parallelize flatMap and needs some practical solution, not only history and theory.

The simplest solution I came up with is to do flattening by hand, basically by replacing it with map + reduce(Stream::concat).

Here's an example to demonstrate how to do this:

@Test
void testParallelStream_NOT_WORKING() throws InterruptedException, ExecutionException {
    new ForkJoinPool(10).submit(() -> {
        Stream.iterate(0, i -> i + 1).limit(2)
                .parallel()

                // does not parallelize nested streams
                .flatMap(i -> generateRangeParallel(i, 100))

                .peek(i -> System.out.println(currentThread().getName() + " : generated value: i=" + i))
                .forEachOrdered(i -> System.out.println(currentThread().getName() + " : received value: i=" + i));
    }).get();
    System.out.println("done");
}

@Test
void testParallelStream_WORKING() throws InterruptedException, ExecutionException {
    new ForkJoinPool(10).submit(() -> {
        Stream.iterate(0, i -> i + 1).limit(2)
                .parallel()

                // concatenation of nested streams instead of flatMap, parallelizes ALL the items
                .map(i -> generateRangeParallel(i, 100))
                .reduce(Stream::concat).orElse(Stream.empty())

                .peek(i -> System.out.println(currentThread().getName() + " : generated value: i=" + i))
                .forEachOrdered(i -> System.out.println(currentThread().getName() + " : received value: i=" + i));
    }).get();
    System.out.println("done");
}

Stream<Integer> generateRangeParallel(int start, int num) {
    return Stream.iterate(start, i -> i + 1).limit(num).parallel();
}

// run this method with produced output to see how work was distributed
void countThreads(String strOut) {
    var res = Arrays.stream(strOut.split("\n"))
            .map(line -> line.split("\\s+"))
            .collect(Collectors.groupingBy(s -> s[0], Collectors.counting()));
    System.out.println(res);
    System.out.println("threads  : " + res.keySet().size());
    System.out.println("work     : " + res.values());
}

Stats from run on my machine:

NOT_WORKING case stats:
{ForkJoinPool-1-worker-23=100, ForkJoinPool-1-worker-5=300}
threads  : 2
work     : [100, 300]

WORKING case stats:
{ForkJoinPool-1-worker-9=16, ForkJoinPool-1-worker-23=20, ForkJoinPool-1-worker-21=36, ForkJoinPool-1-worker-31=17, ForkJoinPool-1-worker-27=177, ForkJoinPool-1-worker-13=17, ForkJoinPool-1-worker-5=21, ForkJoinPool-1-worker-19=8, ForkJoinPool-1-worker-17=21, ForkJoinPool-1-worker-3=67}
threads  : 10
work     : [16, 20, 36, 17, 177, 17, 21, 8, 21, 67]
Dmytro Buryak
  • 348
  • 2
  • 6