3

I want to compare the two Java8 stream terminal operations reduce() and collect() in terms of their parallel performance.

Let's have a look at the following Java8 parallel stream example:

import java.math.BigInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;

import static java.math.BigInteger.ONE;

public class StartMe {

    static Function<Long, BigInteger> fac;

    static {
        fac = x -> x==0? ONE : BigInteger.valueOf(x).multiply(fac.apply(x - 1));
    }

    static long N = 2000;

    static Supplier<BigInteger[]> one() {
        BigInteger[] result = new BigInteger[1];
        result[0] = ONE;
        return () -> result;
    }

    static BiConsumer<BigInteger[], ? super BigInteger> accumulator() {
        return (BigInteger[] ba, BigInteger b) -> {
            synchronized (fac) {
                ba[0] = ba[0].multiply(b);
            }
        };
    }

    static BiConsumer<BigInteger[], BigInteger[]> combiner() {
        return (BigInteger[] b1, BigInteger[] b2) -> {};
    }

    public static void main(String[] args) throws Exception {
        long t0 = System.currentTimeMillis();

        BigInteger result1 = Stream.iterate(ONE, x -> x.add(ONE)).parallel().limit(N).reduce(ONE, BigInteger::multiply);
        long t1 = System.currentTimeMillis();

        BigInteger[] result2 = Stream.iterate(ONE, x -> x.add(ONE)).parallel().limit(N).collect(one(), accumulator(), combiner());
        long t2 = System.currentTimeMillis();

        BigInteger result3 = fac.apply(N);
        long t3 = System.currentTimeMillis();

        System.out.println("reduce():  deltaT = " + (t1-t0) + "ms, result 1 = " + result1);
        System.out.println("collect(): deltaT = " + (t2-t1) + "ms, result 2 = " + result2[0]);
        System.out.println("recursive: deltaT = " + (t3-t2) + "ms, result 3 = " + result3);

    }
}

It computes n! using some - admittedly weird ;-) - algorithms.

The performance results are however surprising:

 reduce():  deltaT = 44ms, result 1 = 3316275...
 collect(): deltaT = 22ms, result 2 = 3316275...
 recursive: deltaT = 11ms, result 3 = 3316275...

Some remarks:

  • I had to synchronize the accumulator() because it accesses the same array in parallel.
  • I expected reduce() and collect() would yield the same performance but reduce() is ~2 times slower than collect(), even if collect() must be synchronized!
  • the fastest algorithm is the sequential and recursive one (which might show the huge overhead of the parallel stream management)

I didn't expect reduce()'s performance to be worse than collect()'s one. Why is this so?

rolve
  • 10,083
  • 4
  • 55
  • 75
Udo
  • 2,300
  • 1
  • 23
  • 27
  • 7
    [How do I write a correct micro-benchmark in Java?](http://stackoverflow.com/questions/504103/how-do-i-write-a-correct-micro-benchmark-in-java) – resueman Mar 11 '15 at 13:49
  • 1
    @resueman I'd suggest looking at the JMH framework. It has a nice tutorial in it. http://openjdk.java.net/projects/code-tools/jmh/ – Stuart Marks Mar 11 '15 at 18:01

1 Answers1

8

Basically, you are measuring the initial overhead of code that is executed the first time. Not only that the optimizer didn’t any work yet, you are measuring the overhead of loading, verifying and initializing the classes.

So it’s no wonder that the evaluation times decrease as each evaluation can reuse classes already loaded for the previous evaluation. Running all three evaluations in a loop or even just changing the order will give you an entirely different picture.

The only predictable result is that the simple recursive evaluation will have the smallest initial overhead as it doesn’t require loading of the Stream API classes.


If you run the code multiple times, or better, use a sophisticated benchmark tool, I guess you will get results similar to mine, where reduce clearly outperforms collect and is indeed faster than the single threaded approach.

The reason collect is slower is because you are using it entirely wrong. The Supplier will be queried for each thread to get a distinct container, therefore the accumulator function does not need any additional synchronization. But it’s important that the combiner function does its work correctly for joining the result containers of the different threads into a single result.

A correct approach would be:

BigInteger[] result2 = Stream.iterate(ONE, x -> x.add(ONE)).parallel().limit(N)
  .collect(()->new BigInteger[]{ONE},
           (a,v)->a[0]=a[0].multiply(v), (a,b)->a[0]=a[0].multiply(b[0]));

On my system, its performance is on par with the reduce approach. Since using an array as mutable container can’t change the immutable nature of BigInteger, there is no advantage in using collect here, using reduce is straight-forward and, as said, has equivalent performance when both methods are used correctly.


By the way, I don’t get why so many programmers try to create self-referential lambda expressions. The straight-forward way for recursive function still is a method:

static BigInteger fac(long x) {
    return x==0? ONE : BigInteger.valueOf(x).multiply(fac(x - 1));
}
static final Function<Long, BigInteger> fac=StartMe::fac;

(Though in your code, you don’t need the Function<Long, BigInteger> at all, just call fac(long) directly).


As a final note, both Stream.iterate and Stream.limit, are really bad for parallel execution. Using a stream with a predictable size and independent operations will outperform your solutions significantly:

BigInteger result4 = LongStream.rangeClosed(1, N).parallel()
    .mapToObj(BigInteger::valueOf).reduce(BigInteger::multiply).orElse(ONE);
Holger
  • 285,553
  • 42
  • 434
  • 765