0

How do I effectively parallel my computation of pi (just as an example)?

This works (and takes about 15secs on my machine):

Stream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).limit(999999999L).mapToDouble(d->4.0d/d).sum()

But all of the following parallel variants run into an OutOfMemoryError

DoubleStream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).parallel().limit(999999999L).map(d->4.0d/d).sum();
DoubleStream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).limit(999999999L).parallel().map(d->4.0d/d).sum();
DoubleStream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).limit(999999999L).map(d->4.0d/d).parallel().sum();

So, what do I need to do to get parallel processing of this (large) stream? I already checked if autoboxing is causing the memory consumption, but it is not. This works also:

DoubleStream.iterate(1, d->-(d+Math.abs(2*d)/d)).boxed().limit(999999999L).mapToDouble(d->4/d).sum()
Stuart Marks
  • 127,867
  • 37
  • 205
  • 259
Cfx
  • 2,272
  • 2
  • 15
  • 21
  • Just found a citate from @Brian-Goetz: "The simple rule is now: last call wins, and governs the execution mode for the whole pipeline." So it does not matter where I add the `.parallel()`call. But still: How to not run into a OutOfMemeoryError during execution? – Cfx Feb 13 '15 at 09:46
  • Maybe giving it more memory is just throwing gasoline on the fire :), are you giving your JVM any parameters for the maximum memory size? See this SO post: http://stackoverflow.com/questions/14763079/what-are-the-xms-and-xmx-parameters-when-starting-jvms – chrisinmtown Feb 13 '15 at 10:52
  • Increasing the limits is not a real solution, cause there would be another size of the stream to break it. I like to know what the memory is used up for and how to circumvent that. – Cfx Feb 13 '15 at 10:58
  • 1
    Out of curiosity: what do you think can a parallel execution gain given the iterative definition of your elements? – Holger Feb 13 '15 at 11:22
  • I was expecting to have benefit in the whole pipeline. Everything but the last sum operation could be done in parallel. But of course this is just an example to play with. – Cfx Feb 13 '15 at 11:25
  • 1
    That’s a strange expectation. Your *source* can’t be processed in parallel as each value depends on the previous. In contrast, the final `sum` operation is perfectly parallelable. – Holger Feb 13 '15 at 14:19
  • Of course your are right for the .iterate() and the .limit(). However it works for small limits, while not being faster. By "the last sum operation" I meant the last sum of all sums to do, not the pipeline step. – Cfx Feb 13 '15 at 14:26

2 Answers2

3

The problem is that you are using constructs which are hard to parallelize.

First, Stream.iterate(…) creates a sequence of numbers where each calculation depends on the previous value, hence, it offers no room for parallel computation. Even worse, it creates an infinite stream which will be handled by the implementation like a stream with unknown size. For splitting the stream, the values have to be collected into arrays before they can be handed over to other computation threads.

Second, providing a limit(…) doesn’t improve the situation, it makes the situation even worse. Applying a limit removes the size information which the implementation just had gathered for the array fragments. The reason is that the stream is ordered, thus a thread processing an array fragment doesn’t know whether it can process all elements as that depends on the information how many previous elements other threads are processing. This is documented:

“… it can be quite expensive on ordered parallel pipelines, especially for large values of maxSize, since limit(n) is constrained to return not just any n elements, but the first n elements in the encounter order.”

That’s a pity as we perfectly know that the combination of an infinite sequence returned by iterate with a limit(…) actually has an exactly known size. But the implementation doesn’t know. And the API doesn’t provide a way to create an efficient combination of the two. But we can do it ourselves:

static DoubleStream iterate(double seed, DoubleUnaryOperator f, long limit) {
  return StreamSupport.doubleStream(new Spliterators.AbstractDoubleSpliterator(limit,
     Spliterator.ORDERED|Spliterator.SIZED|Spliterator.IMMUTABLE|Spliterator.NONNULL) {
       long remaining=limit;
       double value=seed;
       public boolean tryAdvance(DoubleConsumer action) {
           if(remaining==0) return false;
           double d=value;
           if(--remaining>0) value=f.applyAsDouble(d);
           action.accept(d);
           return true;
       }
   }, false);
}

Once we have such an iterate-with-limit method we can use it like

iterate(1d, d -> -(d+2*(Math.abs(d)/d)), 999999999L).parallel().map(d->4.0d/d).sum()

this still doesn’t benefit much from parallel execution due to the sequential nature of the source, but it works. On my four core machine it managed to get roughly 20% gain.

Community
  • 1
  • 1
Holger
  • 285,553
  • 42
  • 434
  • 765
  • That is a very useful answer and good explanation. Now I know how to circumvent such problems with unlimited streams and parallel processing. (BTW: I need to increase the limit much to get the parallel version faster) – Cfx Feb 13 '15 at 19:16
-1

This is because the default ForkJoinPool implementation used by the parallel() method does not limit the number of threads that get created. The solution is to provide a custom implementation of a ForkJoinPool that is limited to the number of threads that it executes in parallel. This can be achieved as mentioned below:

ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
forkJoinPool.submit(() -> DoubleStream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).parallel().limit(999999999L).map(d->4.0d/d).sum());
  • I get the same error, even when I set the poolsize to 1. Maybe it's not the number of threads that cause the error. – Cfx Feb 13 '15 at 15:14
  • 1
    The [ForkJoinPool Javadoc](http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html) says that the default is the number of available processors. – Cfx Feb 13 '15 at 15:19