2

In the book Java 8 In Action, section 7.1.1, the authors state that a stream can benefit from parallel processing by adding the function .parallel(). They provide a simple method called parallelSum(int) to illustrate this. I was curious to see how well it worked so I executed this code:

package lambdasinaction.chap7;

import java.util.stream.Stream;

public class ParallelPlay {

    public static void main(String[] args) {
        System.out.println(parallelSum(100_000_000));
    }

    public static long parallelSum(long n) {
        return Stream.iterate(1L, i -> i + 1)
                .limit(n)
                .parallel()
                .reduce(0L, Long::sum);
    }
}

To my surprise, I received this error:

Exception in thread "main" java.lang.OutOfMemoryError
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
    at java.lang.reflect.Constructor.newInstance(Unknown Source)
    at java.util.concurrent.ForkJoinTask.getThrowableException(Unknown Source)
    at java.util.concurrent.ForkJoinTask.reportException(Unknown Source)
    at java.util.concurrent.ForkJoinTask.invoke(Unknown Source)
    at java.util.stream.SliceOps$1.opEvaluateParallelLazy(Unknown Source)
    at java.util.stream.AbstractPipeline.sourceSpliterator(Unknown Source)
    at java.util.stream.AbstractPipeline.evaluate(Unknown Source)
    at java.util.stream.ReferencePipeline.reduce(Unknown Source)
    at lambdasinaction.chap7.ParallelPlay.parallelSum(ParallelPlay.java:15)
    at lambdasinaction.chap7.ParallelPlay.main(ParallelPlay.java:8)
Caused by: java.lang.OutOfMemoryError: Java heap space
    at java.util.stream.SpinedBuffer.ensureCapacity(Unknown Source)
    at java.util.stream.Nodes$SpinedNodeBuilder.begin(Unknown Source)
    at java.util.stream.AbstractPipeline.copyInto(Unknown Source)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
    at java.util.stream.SliceOps$SliceTask.doLeaf(Unknown Source)
    at java.util.stream.SliceOps$SliceTask.doLeaf(Unknown Source)
    at java.util.stream.AbstractShortCircuitTask.compute(Unknown Source)
    at java.util.concurrent.CountedCompleter.exec(Unknown Source)
    at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(Unknown Source)
    at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
    at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)

I am running Java 1.8.0_45 on Windows 7, SP1 with a four-core processor. What's going on?

Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
ksnortum
  • 2,809
  • 4
  • 27
  • 36
  • On macbook pro (2.2 GHz Intel Core i7 with 16GB ram) it took 26 secs and returned: 5000000050000000 – Nir Alfasi Jun 14 '15 at 03:56
  • Looks like your heap size is too small, run: `java -XX:+PrintFlagsFinal -version | findstr /i "HeapSize PermSize ThreadStackSize"` to check it, and consider increasing it (by changing the values of `-Xms` and `-Xmx`) and try running again. – Nir Alfasi Jun 14 '15 at 03:59
  • Also, using `iterate()` as a stream source essentially guarantees that you will not get any parallelization, since this is a fundamentally sequential generation (can't generate element n+1 until you've generated element n.) Use `IntStream.range()` instead. – Brian Goetz Jun 15 '15 at 12:09

1 Answers1

6

Here you create an infinite stream and limit it afterwards. There are known problems about processing infinite streams in parallel. In particular there's no way to split the task to equal parts effectively. Internally some heuristics are used which are not well suitable for every task. In your case it's much better to create the finite stream using LongStream.range:

import java.util.stream.LongStream;

public class ParallelPlay {

    public static void main(String[] args) {
        System.out.println(parallelSum(100_000_000));
    }

    public static long parallelSum(long n) {
        return LongStream.rangeClosed(1, n).parallel().sum();
    }
}

In this case Stream engine knows from the very beginning how many elements you have, so it can split the task effectively. Also note that using the LongStream is more effecient as you will have no unnecessary boxing.

In general avoid infinite streams if you can solve your task with finite ones.

Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
  • Though, regardless of how many threads may run in parallel, they can’t beat a simple single-threaded `(n+1)*n/2`. Unfortunately, the standard implementation isn’t clever enough to understand that. – Holger Jun 15 '15 at 09:32
  • @Holger: that's an interesting question why there are no specialized streams (like `EmptyStream`, `SingletonStream`, `RangeStream`, etc) which could optimize some operations. Probably this would produce too much code (given the fact that you should support primitives as well). Another possible reason is that it may hurt performance in normal case as stream calls will become polymorphic, so it will be harder for JIT to devirtualize them (currently JDK has only one `Stream` implementation). – Tagir Valeev Jun 15 '15 at 09:53
  • I don’t think that polymorphic stream implementations could hurt performance more than the polymorphic `Spliterator` interfaces. After all, Hotspot does a great job dealing with them and the terminal operation consists of a single method invocation anyway. But you still wouldn’t need them, the only thing you need, is having these (or some of the) high-level operations defined on the internal pipeline stages. Often it’s just that either, nobody thought about it or optimizations have been deferred. See `Stream.count()` which will short-cut in Java 9, or flatmap streams which aren’t lazy yet. – Holger Jun 15 '15 at 10:17
  • @Holger, yes, I already tried optimized JDK9 `Stream.count()`. Optimizations may be significant when non-trivial `flatMap` for big collection is used: in this case stream operations are executed many times. But I agree: my StreamEx lib adds couple more Stream implementations and benchmarks rarely show visible performance overhead. Probably it's not done, because `RangeStream` can optimize only very trivial cases which rarely appear in real code. For example, it's hardly possible to optimize `IntStream.range(0,100).map(x -> x*2).sum()`. – Tagir Valeev Jun 15 '15 at 10:34
  • Actually that is not true (regarding infinite streams). you can rewrite the example from above with: `LongStream.generate(() -> 1L).limit(n).parallel().reduce(0L, Long::sum)` And it will not fail with OOM. – traylz Dec 19 '18 at 12:54