2

From this link, I only partially understood that, at least at some point, there was a problem with java nested parallel streams. However, I couldn't deduce the answer to the following question:

Let's say I have an outer srtream and an inner stream, both of which are using parallel stream. It turns out, according to my calculations, that it'll be more performant (due to data locality, ie caching in L1/L2/L3 CPU caches) if the inner stream is done fully in parallel first, and then (if and only cpu cores are available) do the outer stream. I think this is true for most people's situations. So my question is:

Would Java execute inner stream all in parallel first, and then work on outerstream? If so, does it make that decision at compile time or at run-time? If at run-time, is JIT even smart enough to realize that if the inner stream does have more-than-enough elements (eg hundreds) vs the # of cores (32), then it should definitely use all 32 cores for deal with inner stream before moving on the next element from outer stream; but, if the number of elements in small (eg < 32), then it's ok to "also process in parallel" the elements from the "next" outer stream's elements.

Holger
  • 285,553
  • 42
  • 434
  • 765
Jonathan Sylvester
  • 1,275
  • 10
  • 23
  • 2
    can you show an example of what you mean? like `flatMap.parallel?` or `streamA.... map(streamB.parallel...)` – Eugene Aug 08 '17 at 14:25
  • Stream parallelism is almost entirely unconfigurable, it's designed to operate automagically. If you need to optimize the parallelism, I'd recommend not using streams at all. They have a ton of overhead anyway. – Sean Van Gorder Aug 08 '17 at 15:33

2 Answers2

7

Maybe the following example program sheds some light on the issue:

IntStream.range(0, 10).parallel().mapToObj(i -> "outer "+i)
         .map(outer -> outer+"\t"+IntStream.range(0, 10).parallel()
            .mapToObj(inner -> Thread.currentThread())
            .distinct() // using the identity of the threads
            .map(Thread::getName) // just to be paranoid, as names might not be unique
            .sorted()
            .collect(Collectors.toList()) )
         .collect(Collectors.toList())
         .forEach(System.out::println);

Of course, the results will vary, but the output on my machine looks similar to this:

outer 0 [ForkJoinPool.commonPool-worker-6]
outer 1 [ForkJoinPool.commonPool-worker-3]
outer 2 [ForkJoinPool.commonPool-worker-1]
outer 3 [ForkJoinPool.commonPool-worker-1, ForkJoinPool.commonPool-worker-4, ForkJoinPool.commonPool-worker-5]
outer 4 [ForkJoinPool.commonPool-worker-5]
outer 5 [ForkJoinPool.commonPool-worker-2, ForkJoinPool.commonPool-worker-4, ForkJoinPool.commonPool-worker-7, main]
outer 6 [main]
outer 7 [ForkJoinPool.commonPool-worker-4]
outer 8 [ForkJoinPool.commonPool-worker-2]
outer 9 [ForkJoinPool.commonPool-worker-7]

What we can see here, is that for my machine, having eight cores, seven worker threads are contributing to the work, to utilize all cores, as for the common pool, the caller thread will contribute to the work as well, instead of just waiting for the completion. You can clearly see the main thread within the output.

Also, you can see that the outer stream gets the full parallelism, while some of the inner streams are entirely processed by a single thread only. Each of the worker threads contributes to at least one of the outer stream’s elements. If you reduce the size of the outer stream to the number of cores, you are very likely to see exactly one worker thread processing one outer stream element, implying an entirely sequential execution of all inner streams.

But I used a number not matching the number of cores, not even a multiple of it, to demonstrate another behavior. Since the workload for the outer stream processing is not even, i.e. some threads only process one item, others process two, these idle worker threads perform work-stealing, contributing the the inner stream processing of the remaining outer elements.

There is a simple rationale behind this behavior. When the processing of the outer stream starts, it doesn’t know that it will be an “outer stream”. It’s just a parallel stream and there is no way of finding out whether this is an outer stream other than processing it until one of the functions starts another stream operation. But there is no sense in deferring the parallel processing until this point which might never come.

Besides that, I strongly object you assumption “that it'll be more performant […] if the inner stream is done fully in parallel first”. I’d rather expect it the other way round, read, expect an advantage doing it exactly like it has been implemented, for typical use cases. But, as explained in the previous paragraph, there is no reasonable way to implement a preference for processing inner streams in parallel anyway.

Holger
  • 285,553
  • 42
  • 434
  • 765
  • I was thinking that it'd be more performant to do inner streams in parallel first because my inner streams are often Collections in which there is some "large" hashmap in the collection class that just fits into CPU's L3 cache. So if inner streams all operate in parallel, then this "large" hashmap would fit in L3 cache and all the cores could access it (in parallel). But, if the inner stream were operating sequentially (because outer stream is parallelized), then each inner stream thread would be competing for say 1/32nd of the cache size (on a 32 core machine). Thoughts? – Jonathan Sylvester Aug 08 '17 at 23:29
  • Thoughts @Holger? – Jonathan Sylvester Aug 09 '17 at 09:50
  • A `HashMap` is not a memory block. The `HashMap`, its backing array, each of the `Entry` instances, the referenced key objects and value objects are all distinct objects, not necessarily being adjacent in memory, not even guaranteed to be in the same region. There is no advantage in trying to stuff all of them into the L3 cache at once. And you would end up with each thread still using ¹/₃₂ of the L3 cache, whether it belongs to the same `HashMap` or to different `HashMap`s, as a fundamental principle of the parallel processing is that each thread processes a different chunk of the data. – Holger Aug 09 '17 at 12:13
1

According to the small test I have just written the answer is no (about Would Java execute inner stream all in parallel first, and then work on outerstream). Just notice that by default on my machine there are 4 threads for stream operations that will be used.

    List<Integer> first = List.of(1, 2, 3, 4);
    List<Integer> second = List.of(5, 6, 7, 8);

    first.stream().parallel()
            .peek(x -> {
                System.out.println("first : " + x + " " + Thread.currentThread().getName());
            })
            .map(x -> second.stream().parallel().peek(y -> {

                System.out.println("second : " + y + " " + Thread.currentThread().getName());

            }).collect(Collectors.toList()))
            .filter(x -> true)
            .collect(Collectors.toList());

You can see from the output that the inner stream is not executed first. You can increase the number of elements in each stream to get a more accurate output (of interleaving "first" and "second" - don't know if it's the correct term).

But there is something else that strikes me here... How is the example above not blocking is beyond me. There are only 4 threads and 4 elements and all threads are waiting for the inner stream to process; but the ForkJoinPool has no available threads to take - so how does it work? The link you provided (@Holger's answer) says that there will be more threads created than you actually request. But their names are missing from the output...

Eugene
  • 117,005
  • 15
  • 201
  • 306
  • 2
    Seems, you misunderstood me. F/J will create compensation threads, *if threads get knowingly blocked*. However, in case of the Stream API, the caller thread doesn’t get blocked, but contributes to the processing. The early Java 8 versions seemed to have problems with this work-stealing, but this seems to have been improved. You can still see compensation threads in action when waiting for a `CompletableFuture` in a worker thread. – Holger Aug 08 '17 at 16:33