5

I want to generate 5 distinct random numbers from range 0 - 50 and then execute some operation on them in parallel. When I wrote this the program never ended:

new Random().ints(0, 50)
            .distinct()
            .limit(5)
            .parallel()
            .forEach(d -> System.out.println("s: " + d));

I've tried to debug it using peek. I've got infinit number of c: lines, 50 d: lines, but zero l: or s: lines:

new Random().ints(0, 50)
            .peek(d -> System.out.println("c: " + d))
            .distinct()
            .peek(d -> System.out.println("d: " + d))
            .limit(5)
            .peek(d -> System.out.println("l: " + d))
            .parallel()
            .forEach(d -> System.out.println("s: " + d));

What is wrong with my implementation?

janinko
  • 91
  • 1
  • 10
  • 1
    One notable difference between infinite stream like `IntStream.iterate(…)` and the random number stream is that the random number stream isn’t really infinite but has a size of `Long.MAX_VALUE` and even reports that, which may have interesting effects… – Holger Apr 15 '16 at 18:43
  • It's not duplicate of [this question](http://stackoverflow.com/questions/35189387/parallel-processing-with-infinite-stream-in-java), please read my answer. – Tagir Valeev Apr 16 '16 at 16:33

3 Answers3

6

First, please note that .parallel() changes the parallel status of the whole pipeline, so it affects all the operations, not only subsequent ones. In your case

new Random().ints(0, 50)
            .distinct()
            .limit(5)
            .parallel()
            .forEach(d -> System.out.println("s: " + d));

Is the same as

new Random().ints(0, 50)
            .parallel()
            .distinct()
            .limit(5)
            .forEach(d -> System.out.println("s: " + d));

You cannot parallelize only part of the pipeline. It's either parallel or not.

Now back to your question. As Random.ints is an unordered stream, unordered implementations of distinct and limit are selected, so it's not a duplicate of this question (where problem was in ordered distinct implementation). Here the problem is in the unordered limit() implementation. To reduce the possible contention it does not check the total count of elements found in different threads until every subtask gets at least 128 elements or the upstream is exhausted (see the implementation, 1 << 7 = 128). In your case upstream distinct() found only 50 different elements and desperately traverses the input in the hope to find more, but downstream limit() don't signal to stop the processing, because it wants to collect at least 128 elements before checking whether the limit is reached (which is not very smart as the limit is less than 128). So to make this thing working you should select at least (128*number of CPUs) different elements. On my 4-core machine using new Random().ints(0, 512) succeeds while new Random().ints(0, 511) stuck.

To fix this I'd suggest to collect random numbers sequentially and create a new stream there:

int[] ints = new Random().ints(0, 50).distinct().limit(5).toArray();
Arrays.stream(ints).parallel()
      .forEach(d -> System.out.println("s: " + d));

I assume that you want to perform some expensive downstream processing. In this case parallelizing the generation of 5 random numbers is not very useful. This part will be faster when performed sequentially.

Update: filed a bug report and submitted a patch.

Community
  • 1
  • 1
Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
2

Your call to ints(0, 50)

Returns an effectively unlimited stream of pseudorandom int values, each conforming to the given origin (inclusive) and bound (exclusive).

I originally thought that it was the unterminated IntStream that was the problem, but I duplicated the problem.

new Random().ints(0, 50)
            .distinct().limit(5)
            .parallel().forEach(a -> System.out.println(a));

Goes to an infinite loop, while

new Random().ints(0, 50)
            .distinct().limit(5)
            .forEach(a -> System.out.println(a));

Finishes correctly.

My Stream knowledge is not so good that I could explain it, but clearly the parallelization doesn't play nicely (possibly due to the infinite stream).

Kayaman
  • 72,141
  • 5
  • 83
  • 121
  • But when I remove the `.parallel()` the program will correctly print 5 distinct numbers and exit. Why adding `.parallel()` after the limit makes the execution infinite? – janinko Apr 15 '16 at 11:43
  • 1
    @janinko when you add parallel(), the whole stream is being made parallel() which means the `distinct()` has to be computed using multiple threads. You might want to collect the results and only do parallel on that. – Peter Lawrey Apr 15 '16 at 11:50
  • @PeterLawrey So it doesn't matter on what place I use the `.parallel()`? My idea was that only things specified after the `.parallel()` will be done in parallel. – janinko Apr 15 '16 at 11:55
  • @janinko that would make sense IMHO, however this isn't what is does. – Peter Lawrey Apr 15 '16 at 12:12
1

The closest option to what you're trying to do is perhaps to use iterate and unordered:

Random ran = new Random();
IntStream.iterate(ran.nextInt(50), i -> ran.nextInt(50))
    .unordered()
    .distinct()
    .limit(5)
    .parallel()
    .forEach(System.out::println);

Using an infinite stream together with distinct and parallel can be expensive or result in no responses. See the API Note or this question for more information.

Community
  • 1
  • 1
dejvuth
  • 6,986
  • 3
  • 33
  • 36
  • This seems to work, but when I replace `IntStream.iterate` with `ran.ints(0,50)` it loops. Why is `IntStream` from the `IntStream.iterate` method behaving differently then `IntStream` from `Random.ints` method? – janinko Apr 15 '16 at 13:07
  • @janinko You're absolutely right. The behavior difference looks very strange. I don't know the exact answer, but I suspect that it might be because of the ways the elements are [split](https://docs.oracle.com/javase/8/docs/api/java/util/Spliterator.html#trySplit--) for the parallelization. – dejvuth Apr 15 '16 at 14:30