First, please note that .parallel()
changes the parallel status of the whole pipeline, so it affects all the operations, not only subsequent ones. In your case
new Random().ints(0, 50)
.distinct()
.limit(5)
.parallel()
.forEach(d -> System.out.println("s: " + d));
Is the same as
new Random().ints(0, 50)
.parallel()
.distinct()
.limit(5)
.forEach(d -> System.out.println("s: " + d));
You cannot parallelize only part of the pipeline. It's either parallel or not.
Now back to your question. As Random.ints
is an unordered stream, unordered implementations of distinct
and limit
are selected, so it's not a duplicate of this question (where problem was in ordered distinct implementation). Here the problem is in the unordered limit()
implementation. To reduce the possible contention it does not check the total count of elements found in different threads until every subtask gets at least 128 elements or the upstream is exhausted (see the implementation, 1 << 7 = 128
). In your case upstream distinct()
found only 50 different elements and desperately traverses the input in the hope to find more, but downstream limit()
don't signal to stop the processing, because it wants to collect at least 128 elements before checking whether the limit is reached (which is not very smart as the limit is less than 128). So to make this thing working you should select at least (128*number of CPUs) different elements. On my 4-core machine using new Random().ints(0, 512)
succeeds while new Random().ints(0, 511)
stuck.
To fix this I'd suggest to collect random numbers sequentially and create a new stream there:
int[] ints = new Random().ints(0, 50).distinct().limit(5).toArray();
Arrays.stream(ints).parallel()
.forEach(d -> System.out.println("s: " + d));
I assume that you want to perform some expensive downstream processing. In this case parallelizing the generation of 5 random numbers is not very useful. This part will be faster when performed sequentially.
Update: filed a bug report and submitted a patch.