10

Would you please give me reference why there is significant difference in the execution time between the following 2 factorial implementations using the Java Stream API:

  1. Serial implementation
  2. Parallel implementation (using Stream.parallel()) executed in a custom fork join pool with parallelism set to 1

My expectations were to have near execution times, however the parallel version has a speedup by a factor of 2. I did not run any specialized benchmarks however the execution time should not differ so much even in a cold start jvm. Bellow I attach the source code of the two implementations:

public class FastFactorialSupplier implements FactorialSupplier {
  private final ExecutorService executorService;

  public FastFactorialSupplier(ExecutorService executorService) {
      this.executorService = executorService;
  }

  @Override
  public BigInteger get(long k) {
      try {
          return executorService
                  .submit(
                          () -> LongStream.range(2, k + 1)
                                  .parallel()
                                  .mapToObj(BigInteger::valueOf)
                                  .reduce(BigInteger.ONE, (current, factSoFar) -> factSoFar.multiply(current))
                  )
                  .get();
      } catch (InterruptedException | ExecutionException e) {
          e.printStackTrace();
      }

      return BigInteger.ZERO;
  }
}
public class MathUtils {

  public static BigInteger factorial(long k) {
      return LongStream.range(2, k + 1)
              .mapToObj(BigInteger::valueOf)
              .reduce(BigInteger.ONE, (current, factSoFar) -> factSoFar.multiply(current));
  }
}

Here are the test cases with attached sample execution time as a comments based on what the intellij junit runner showed.

    @Test
    public void testWithoutParallel() {
        //2s 403
        runTest(new DummyFactorialSupplier()); // uses MathUtils.factorial
    }

    @Test
    public void testParallelismWorkStealing1() {
        //1s 43
        runTest(new FastFactorialSupplier(Executors.newWorkStealingPool(1)));
    }

    @Test
    public void testParallelismForkJoin1() {
        // 711ms
        runTest(new FastFactorialSupplier(new ForkJoinPool(1)));
    }

    @Test
    public void testExecutorForkJoin() {
        //85ms
        runTest(new FastFactorialSupplier(new ForkJoinPool()));
    }

    private void runTest(FactorialSupplier factorialSupplier) {
        BigInteger result = factorialSupplier.get(100000);
        assertNotNull(result);
//        assertEquals(456574, result.toString().length());
    }

The tests were run using java 11 since there was a issue in java 8 with custom fork join pools - https://bugs.openjdk.java.net/browse/JDK-8190974

Can there be an optimisation related with the pseudo parallel processing and how the execution is scheduled whereas there is no such given the execution is purely sequential?

Edit:

I also run microbenchmark using jmh

Parallel:

public class FastFactorialSupplierP1Test {

    @Benchmark
    @BenchmarkMode({Mode.AverageTime, Mode.SampleTime, Mode.SingleShotTime, Mode.Throughput, Mode.All})
    @Fork(value = 1, warmups = 1)
    public void measure() {
        runTest(new FastFactorialSupplier(new ForkJoinPool(1)));
    }

    private void runTest(FactorialSupplier factorialSupplier) {
        BigInteger result = factorialSupplier.get(100000);
        assertNotNull(result);
    }

    public static void main(String[] args) throws Exception {
        org.openjdk.jmh.Main.main(args);
    }
}

Serial:

public class SerialFactorialSupplierTest {
    @Benchmark
    @BenchmarkMode({Mode.AverageTime, Mode.SampleTime, Mode.SingleShotTime, Mode.Throughput, Mode.All})
    @Fork(value = 1, warmups = 1)
    public void measure() {
        runTest(new DummyFactorialSupplier());
    }

    private void runTest(FactorialSupplier factorialSupplier) {
        BigInteger result = factorialSupplier.get(100000);
        assertNotNull(result);
    }

    public static void main(String[] args) throws Exception {
        org.openjdk.jmh.Main.main(args);
    }
}
public class IterativeFactorialTest {
    @Benchmark
    @BenchmarkMode({Mode.AverageTime, Mode.SampleTime, Mode.SingleShotTime, Mode.Throughput, Mode.All})
    @Fork(value = 1, warmups = 1)
    public void measure() {
        runTest(new IterativeFact());
    }

    private void runTest(FactorialSupplier factorialSupplier) {
        BigInteger result = factorialSupplier.get(100000);
        assertNotNull(result);
    }

    public static void main(String[] args) throws Exception {
        org.openjdk.jmh.Main.main(args);
    }

    class IterativeFact implements FactorialSupplier {

        @Override
        public BigInteger get(long k) {
            BigInteger result = BigInteger.ONE;

            while (k-- != 0) {
                result = result.multiply(BigInteger.valueOf(k));
            }

            return result;
        }
    }
}

Results:

FastFactorialSupplierP1Test.measure                    avgt    5  0.437 ± 0.006   s/op
IterativeFactorialTest.measure                         avgt    5  2.643 ± 0.383   s/op
SerialFactorialSupplierTest.measure                    avgt    5  2.226 ± 0.044   s/op
radpet
  • 701
  • 1
  • 5
  • 16
  • 1
    This is much more likely to be a bug in how you're measuring things than any of the things you describe. For example, Java programs _in general_ get faster as they run longer; it is actively _probable_ that the benchmark numbers are the way they are because of the order you ran the tests in, rather than _anything_ about the code you're trying to measure. – Louis Wasserman Jun 02 '19 at 21:16
  • @Louis Wasserman .. i don't think it's a measuring problem testWithoutParallel: without parallel at all, so obviously the slowest one testParallelismWorkStealing1: same as ForkJoinPool(1), but without async-mode, which in this test-case slowes down performance testParallelismForkJoin1: uses only one thread, but .parallel does BigInteger::valueOf parallel and is therefor faster than "testWithoutParallel" testExecutorForkJoin: uses all processing-power available and is therefor the fastest one – Lukas Resch Jun 02 '19 at 21:26
  • would like to put this in an answer, so I would be pleased if you would open the question again – Lukas Resch Jun 02 '19 at 21:27
  • In my opinion there should be a difference if you execute sequentially vs fork join pool with parallelism=1 but I could not find any official references. – radpet Jun 02 '19 at 21:45
  • I just tried running sequential vs. `Executors.newWorkStealingPool(1)`, and the latter was 5 times faster when running each 10 times. – Jeppe Jun 02 '19 at 22:36

1 Answers1

5

You have chosen an operation whose performance depends on the order of evaluation. Just consider that the performance of BigInteger.multiply depends on the magnitude of the two factors. Then, running through a sequence of BigInteger instances with an accumulating value as a factor to the next multiplication will make the operation more and more expensive, the farther you get.

In contrast, when you split the range of values into smaller ranges, perform the multiplication individually for each range and multiply the results of the ranges, you get a performance advantage, even if these sub-ranges are not evaluated concurrently.

So when a parallel stream splits the work into chunks, to be potentially picked up by other worker threads, but ends up evaluating them in the same thread, you still get a performance improvement, in this specific setup, due to the changed evaluation order.

We can test this by removing all Stream and thread pool related artifacts:

public static BigInteger multiplyAll(long from, long to, int split) {
    if(split < 1 || to - from < 2) return serial(from, to);
    split--;
    long middle = (from + to) >>> 1;
    return multiplyAll(from, middle, split).multiply(multiplyAll(middle, to, split));
}

private static BigInteger serial(long l1, long l2) {
    BigInteger bi = BigInteger.valueOf(l1++);
    for(; l1 < l2; l1++) {
        bi = bi.multiply(BigInteger.valueOf(l1));
    }
    return bi;
}

I don’t have a JMH setup at hand, to post stressable results, but a simple run revealed that the order of magnitude matches your results, just a single split already roughly halves the evaluation time and higher numbers still improve the performance though the curve becomes flatter.

As explained in ForkJoinTask.html#getSurplusQueuedTaskCount(), it’s a reasonable strategy to split work such that there are a few additional tasks per worker, to be potentially picked up by other threads, which may compensate unbalanced workloads, e.g. if some elements are cheaper to process than others. Apparently, parallel streams have no special code for handling the case that there are no additional worker threads, hence, you witness the effects of splitting the work, even when there is only one thread to process it.

Holger
  • 285,553
  • 42
  • 434
  • 765