2

I watched a talk by José Paumard on InfoQ : http://www.infoq.com/fr/presentations/jdk8-lambdas-streams-collectors (French)

The thing is I got stuck on this one point. To collect 1M Long using stream AND multithreading we can do it this way :

Stream<Long> stream = 
  Stream.generate(() -> ThreadLocalRandom.current().nextLong()) ;

List<Long> list1 = 
  stream.parallel().limit(10_000_000).collect(Collectors.toList()) ;

But given the fact that the threads are always checking the said limit in hinders performance.

In that talk we also see this second solution :

Stream<Long> stream = 
  ThreadLocalRandom.current().longs(10_000_000).mapToObj(Long::new) ;

List<Long> list = 
  stream.parallel().collect(Collectors.toList()) ;

and it seems to be better performance wise.

So here is my question : Why is that the second code better, and is there a better, or at least less costly way to do it?

Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
Tahar Bakir
  • 716
  • 5
  • 14
  • 1
    it's as far as I can see not multithreaded and thereby avoids the parallelism synchronization overhead. `return StreamSupport.longStream(.., false)` in the source: https://docs.oracle.com/javase/8/docs/api/java/util/stream/StreamSupport.html#longStream-java.util.Spliterator.OfLong-boolean- – zapl Nov 26 '15 at 01:56
  • yes but for the sake of argument let's say we have to use multithreading :) – Tahar Bakir Nov 26 '15 at 01:58
  • 1
    I may be blind, but where is the multithreading? Both provided solutions are single-threaded. – user3707125 Nov 26 '15 at 02:01
  • @user3707125 nice catch will correct that – Tahar Bakir Nov 26 '15 at 02:08
  • 1
    You could try if a divide and conquer approach can win over a sequential approach. E.g. on a 4 core system generating 4 sub lists of 1/4th size per core, then merging should be able to outperform a single threaded solution. IDK if there is a nice way to do that with parallel streams though. – zapl Nov 26 '15 at 02:14

3 Answers3

7

This is an implementation dependent limitation. One thing that developers, concerned about parallel performance, have to understand, is that predictable stream sizes help the parallel performance generally as they allow balanced splitting of the workload.

The issue here is, that the combination of an infinite stream as created via Stream.generate() and limit() does not produce a stream with a predictable size, despite it looks perfectly predictable to us.

We can examine it using the following helper method:

static void sizeOf(String op, IntStream stream) {
    final Spliterator.OfInt s = stream.spliterator();
    System.out.printf("%-18s%5d, %d%n", op, s.getExactSizeIfKnown(), s.estimateSize());
}

Then

sizeOf("randoms with size", ThreadLocalRandom.current().ints(1000));
sizeOf("randoms with limit", ThreadLocalRandom.current().ints().limit(1000));
sizeOf("range", IntStream.range(0, 100));
sizeOf("range map", IntStream.range(0, 100).map(i->i));
sizeOf("range filter", IntStream.range(0, 100).filter(i->true));
sizeOf("range limit", IntStream.range(0, 100).limit(10));
sizeOf("generate limit", IntStream.generate(()->42).limit(10));

will print

randoms with size  1000, 1000
randoms with limit   -1, 9223372036854775807
range               100, 100
range map           100, 100
range filter         -1, 100
range limit          -1, 100
generate limit       -1, 9223372036854775807

So we see, certain sources like Random.ints(size) or IntStream.range(…) produce streams with a predictable size and certain intermediate operations like map are capable of carrying the information as they know that the size is not affected. Others like filter and limit do not propagate the size (as a known exact size).

It’s clear that filter cannot predict the actual number of elements, but it provides the source size as an estimate which is reasonable insofar that that’s the maximum number of elements that can ever pass the filter.

In contrast, the current limit implementation does not provide a size, even if the source has an exact size and we know the predictable size is as simple as min(source size, limit). Instead, it even reports a nonsensical estimate size (the source’s size) despite the fact that it is known that the resulting size will never be higher than the limit. In case of an infinite stream we have the additional obstacle that the Spliterator interface, on which streams are based, doesn’t have a way to report that it is infinite. In these cases, infinite stream + limit returns Long.MAX_VALUE as an estimate which means “I can’t even guess”.

Thus, as a rule of thumb, with the current implementation, a programmer should avoid using limit when there is a way to specify the desired size beforehand at the stream’s source. But since limit also has significant (documented) drawbacks in the case of ordered parallel streams (which doesn’t applies to randoms nor generate), most developers avoid limit anyway.

Holger
  • 285,553
  • 42
  • 434
  • 765
  • 1
    It's actually very strange that `limit()` and `skip()` clear SIZED characteristic even if the underlying spliterator reports SIZED/SUBSIZED. E.g. `System.out.println(IntStream.range(0, 10).limit(5).spliterator().getExactSizeIfKnown())` or the same with `skip()`. – Tagir Valeev Nov 26 '15 at 10:57
  • 2
    @Tagir Valeev: that example is already included in my answer, see `range limit`. I vaguely remember a discussion that `skip` and `limit` were considered to be of a more dynamic nature than the static, predictable stream size, but actually, there is no difference between letting such a pipeline stage perform a `min` and invoking `getExactSizeIfKnown` on an arbitrary `Spliterator`. In fact, you could work-around the problem with a custom spliterator. – Holger Nov 26 '15 at 11:03
  • 1
    Surely I could rewrite every Stream API method in better way (though custom spliterator does not totally compatible: the stream created on it does not propagate `parallel()` or `unordered()` to the source), but it would be better to have it in JDK. Seems that this is done due to limitation of the internal engine: currently for any stream op the exact size is either unknown or equals to the exact size of the source spliterator. Tracking of possible exact size changes is not supported internally. – Tagir Valeev Nov 26 '15 at 11:13
  • 1
    @Tagir Valeev: of course, re-implementing everything is not a solution. It’s just a proof that there is no fundamental problem preventing such a solution. – Holger Nov 26 '15 at 11:49
  • Yep looks like it answers the question, and taking into account @TagirValeev answer, then it seems clear that using limit() with (infinite) stream is a really bad idea, and more so when you work with objects. – Tahar Bakir Nov 26 '15 at 14:29
4

Why is that the second code better?

In the first case you create infinite source, split it for parallel execution to a bunch of tasks each providing an infinite number of elements, then limit the overall size of the result. Even though the source is unordered, this implies some overhead. In this case individual tasks should talk to each other to check when the overall size is reached. If they talk often, this increases the contention. If they talk less, they actually produce more numbers than necessary and then drop some of them. I believe, actual stream API implementation is to talk less between tasks, but this actually leads to produce more numbers than necessary. This also increases memory consumption and activates garbage collector.

In contrast in the second case you create a finite source of known size. When the task is split into subtasks, their sizes are also well-defined and in total they produce exactly the requested number of random numbers without the necessity to talk to each other at all. That's why it's faster.

Is there a better, or at least less costly way to do it?

The biggest problem in your code samples is boxing. If you need 10_000_000 random numbers, it's very bad idea to box each of them and store in the List<Long>: you create tons of unnecessary objects, perform many heap allocations and so on. Replace this with primitive streams:

long[] randomNumbers = ThreadLocalRandom.current().longs(10_000_000).parallel().toArray();

This would be much much faster (probably an order of magnitude).

Also you may consider new Java-8 SplittableRandom class. It provides roughly the same performance, but the generated random numbers have much higher quality (including passing of DieHarder 3.31.1):

long[] randomNumbers = new SplittableRandom().longs(10_000_000).parallel().toArray();
Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
0

JDK docs has good explanation of this behavior, it is ordering constraint that kills performance for parallel processing

Text from doc for limit function - https://docs.oracle.com/javase/8/docs/api/java/util/stream/LongStream.html

While limit() is generally a cheap operation on sequential stream pipelines, it can be quite expensive on ordered parallel pipelines, especially for large values of maxSize, since limit(n) is constrained to return not just any n elements, but the first n elements in the encounter order. Using an unordered stream source (such as generate(LongSupplier)) or removing the ordering constraint with BaseStream.unordered() may result in significant speedups of limit() in parallel pipelines, if the semantics of your situation permit. If consistency with encounter order is required, and you are experiencing poor performance or memory utilization with limit() in parallel pipelines, switching to sequential execution with sequential() may improve performance. Blockquote

Ashkrit Sharma
  • 627
  • 5
  • 7