6

Consider the following simple code:

Stream.of(1)
  .flatMap(x -> IntStream.range(0, 1024).boxed())
  .parallel() // Moving this before flatMap has the same effect because it's just a property of the entire stream
  .forEach(x -> {
     System.out.println("Thread: " + Thread.currentThread().getName());
  });

For a long time, I thought that Java would have parallel execution for elements even after flatMap. But the above code prints all "Thread: main", which proves my thought wrong.

A simple way to make it parallel after flatMap would be to collect and then stream again:

Stream.of(1)
  .flatMap(x -> IntStream.range(0, 1024).boxed())
  .parallel() // Moving this before flatMap has the same effect because it's just a property of the entire stream
  .collect(Collectors.toList())
  .parallelStream()
  .forEach(x -> {
     System.out.println("Thread: " + Thread.currentThread().getName());
  });

I was wondering whether there is a better way, and about the design choice of flatMap that only parallelizes the stream before the call, but not after the call.

========= More Clarification about the Question ========

From some answers, it seems that my question is not fully conveyed. As @Andreas said, if I start with a Stream of 3 elements, there could be 3 threads running.

But my question really is: Java Stream uses a common ForkJoinPool that has a default size equal to one less than the number of cores, according to this post. Now suppose I have 64 cores, then I expect my above code would see many different threads after flatMap, but in fact, it sees only one (or 3 in Andreas' case). By the way, I did use isParallel to observe that the stream is parallel.

To be honest, I wasn't asking this question for pure academic interest. I ran into this problem in a project that presents a long chain of stream operations for transforming a dataset. The chain starts with a single file, and explodes to a lot of elements through flatMap. But apparently, in my experiment, it does NOT fully exploit my machine (which has 64 cores), but only uses one core (from observation of the cpu usage).

zico
  • 139
  • 1
  • 3
  • I think it doesn't matter whether the call to `parallel()` is made before or after `flatMap`. You are asking about something that is not in the spec, i.e. it is implementation specific. It might change from one version to another, so the code can't be trusted to behave the same way among versions. I think that, for now, everything depends on the size of the stream. Try with much larger values and with different versions of Java – fps Sep 23 '20 at 04:19
  • What's the point of the `flatMap` business? It just obscures the issue. – chrylis -cautiouslyoptimistic- Sep 23 '20 at 04:33
  • I will also note that you can at any point call `isParallel` to see whether a stream that you think is parallel or sequential in fact is. – chrylis -cautiouslyoptimistic- Sep 23 '20 at 04:45
  • 3
    That’s a known limitation of the OpenJDK implementation. And if they’re ever going to change that, I see some other problems, they have to address first. E.g., the single element stream is implicitly unordered, which has no effect now, but when enabling parallelization for the “sub-streams”, treating the entire stream as unordered can cause surprises. If you want to perform parallel recursive file processing [this answer](https://stackoverflow.com/a/51653995/2711488) may serve as an inspiration. – Holger Sep 23 '20 at 14:24
  • @Holger, thanks, will take a look! – zico Sep 27 '20 at 08:16
  • @Holger Helpful comment. What I see in [the package description](https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html) is, "the orientation of a stream can be modified with the BaseStream.sequential() and BaseStream.parallel() operations." Period. End of explanation. Is there a link somewhere for the limitation? – John Dec 22 '20 at 01:14
  • 2
    @JohnMeyer the internals have been discussed in [this old Q&A](https://stackoverflow.com/q/29229373/2711488). While the issue of missing short-circuiting has been fixed in the meanwhile, the fundamental behavior of just iterating sub-streams sequentially did not change. – Holger Dec 22 '20 at 10:40

3 Answers3

1

I was wondering [...] about the design choice of flatMap that only parallelizes the stream before the call, but not after the call.

You're mistaken. All steps both before and after the flatMap are run in parallel, but it only splits the original stream between threads. The flatMap operation is then handled by one such thread, and its stream isn't split.

Since your original stream only has 1 element, it cannot be split, and hence parallel has no effect.

Try changing to Stream.of(1, 2, 3), and you will see that the forEach, which is after the flatMap, is actually run in 3 different threads.

Andreas
  • 154,647
  • 11
  • 152
  • 247
  • 1
    This is actually not specified behavior. – chrylis -cautiouslyoptimistic- Sep 23 '20 at 04:34
  • @chrylis-cautiouslyoptimistic- I never said it was, I was correcting OP's incorrect claim that steps after the `flatMap` wasn't executed in parallel. I've described the *observed* behavior, same as question did. I've made no claims about "specified" behavior, and that it cannot change in the future. – Andreas Sep 23 '20 at 04:41
0

The documentation for forEach specifies:

For any given element, the action may be performed at whatever time and in whatever thread the library chooses.

In particular, "execute all the operations on the invoking thread" seems like a good broadly-safe implementation.

Note that your attempt to parallelize the stream does not require any specific parallelism, but you'd be much more likely to see an effect with this:

IntStream.range(0, 1024).boxed()
  .parallel()
  .map(i -> "Thread: " + Thread.currentThread().getName())
  .forEach(System.out::println);
chrylis -cautiouslyoptimistic-
  • 75,269
  • 21
  • 115
  • 152
0

For anyone like me, who has a dire need to parallelize flatMap and needs some practical solution, not only history and theory. And for those who doesn't consider collecting all the items in between before parallelizing them.

The simplest solution I came up with is to do flattening by hand, basically by replacing it with map + reduce(Stream::concat).

I've already answered to the same question in another thread, see details at https://stackoverflow.com/a/66386078/3606820

Dmytro Buryak
  • 348
  • 2
  • 6