10

On my machine, the program below prints:

OptionalLong[134043]
 PARALLEL took 127869 ms
OptionalLong[134043]
 SERIAL took 60594 ms

It's not clear to my why executing the program in serial is faster than executing it in parallel. I've given both programs -Xms2g -Xmx2g on an 8gb box thats relatively quiet. Can someone clarify whats going on?

import java.util.stream.LongStream;
import java.util.stream.LongStream.Builder;

public class Problem47 {

    public static void main(String[] args) {

        final long startTime = System.currentTimeMillis();
        System.out.println(LongStream.iterate(1, n -> n + 1).parallel().limit(1000000).filter(n -> fourConsecutives(n)).findFirst());
        final long endTime = System.currentTimeMillis();
        System.out.println(" PARALLEL took " +(endTime - startTime) + " ms");

        final long startTime2 = System.currentTimeMillis();
        System.out.println(LongStream.iterate(1, n -> n + 1).limit(1000000).filter(n -> fourConsecutives(n)).findFirst());
        final long endTime2 = System.currentTimeMillis();
        System.out.println(" SERIAL took " +(endTime2 - startTime2) + " ms");
    }

    static boolean fourConsecutives(final long n) {
        return distinctPrimeFactors(n).count() == 4 &&
                distinctPrimeFactors(n + 1).count() == 4 &&
                distinctPrimeFactors(n + 2).count() == 4 &&
                distinctPrimeFactors(n + 3).count() == 4;
    }

    static LongStream distinctPrimeFactors(long number) {
        final Builder builder = LongStream.builder();
        final long limit = number / 2;
        long n = number;
        for (long i = 2; i <= limit; i++) {
            while (n % i == 0) {
                builder.accept(i);
                n /= i;
            }
        }
        return builder.build().distinct();
    }

}
Stuart Marks
  • 127,867
  • 37
  • 205
  • 259
Amir Afghani
  • 37,814
  • 16
  • 84
  • 124
  • 6
    Don’t use `System.currentTimeMillis()` for measuring elapsed time. Use [`System.nanoTime()`](http://docs.oracle.com/javase/8/docs/api/java/lang/System.html#nanoTime--). The result of `System.currentTimeMillis()` will be influenced by system clock changes, e.g. triggered by the user or NTP updates etc. – Holger Jun 04 '14 at 08:26

2 Answers2

18

We can make it easier to execute in parallel, but we can't necessarily make parallelism easy.

The culprit in your code is the combination of limit+parallel. Implementing limit() is trivial for sequential streams, but fairly expensive for parallel streams. This is because the definition of the limit operation is tied to the encounter order of the stream. Streams with limit() are often slower in parallel than in sequential, unless the computation done per element is very high.

Your choice of stream source is also limiting parallelism. Using iterate(0, n->n+1) gives you the positive integers, but iterate is fundamentally sequential; you can't compute the nth element until you've computed the (n-1)th element. So when we try and split this stream, we end up splitting (first, rest). Try using range(0,k) instead; this splits much more nicely, splitting neatly by halves with random access.

Brian Goetz
  • 90,105
  • 23
  • 150
  • 161
  • Thanks for you answer Brian. So, if I were to remove the limit, should I see the performance characteristics favor the parallel solution? – Amir Afghani Jun 04 '14 at 00:51
  • After removing limit(), I get: OptionalLong[134043] PARALLEL took 59341 ms OptionalLong[134043] SERIAL took 57290 ms – Amir Afghani Jun 04 '14 at 00:54
  • So, removing limit changed the performance characteristics such that parallel did not perform so poorly, but it still is slower than serial, which seems wrong to me – Amir Afghani Jun 04 '14 at 00:55
  • 2
    I just noticed also that your stream source uses iterate(), which is also fundamentally sequential (can't compute element #n until you've computed #(n-1)) and therefore will split poorly. Try replacing iterate() with range(0, 100000) instead. – Brian Goetz Jun 04 '14 at 00:58
  • I wish I had better results to report. When I moved to use range, the performance of PARALLEL regressed to 114548 ms. Your answer certainly makes sense, but it didn't bear fruit in my experiment. Perhaps you can try this yourself and confirm my findings? – Amir Afghani Jun 04 '14 at 01:04
  • 4
    Oh, and also you should be using a decent microbench harness like JMH. Your code exhibits all of the classic microbenchmarking mistakes (insufficient warmup, subject to deoptimization bias, etc. Otherwise the numbers are not really meaningful. – Brian Goetz Jun 04 '14 at 01:33
  • I will uptake your feedback and post an updated version of the experiment. In principal I agree with your suggestion - however - I would be surprised if we saw a substantial difference. – Amir Afghani Jun 04 '14 at 01:43
  • 1
    I would also suggest ensuring that your `LongStream` in `distinctPrimeFactors()` is sequential. You don't want to attempt parallellization within an already parallelized task. – Marko Topolnik Jun 04 '14 at 11:25
  • I've had a frustrating experience with jmh. I tried following the instructions on the openJDK website and I got an error installing the project from Maven. It would be nice if there were an IntelliJ plugin I could simply use to run these benchmarks... – Amir Afghani Jun 04 '14 at 16:17
13

While Brian Goetz is right about your setup, e.g. that you should use .range(1, 1000000) rather than .iterate(1, n -> n + 1).limit(1000000) and that your benchmark method is very simplistic, I want to emphasize the important point:

even after fixing these issues, even using a wall clock and the TaskManager you can see that there’s something wrong. On my machine the operation takes about half a minute and you can see that the parallelism drops to single core after about two seconds. Even if a specialized benchmark tool could produce different results it wouldn’t matter unless you want to run your final application within a benchmark tool all the time…

Now we could try to mock more about your setup or tell you that you should learn special things about the Fork/Join framework like the implementors did on the discussion list.

Or we try an alternative implementation:

ExecutorService es=Executors.newFixedThreadPool(
                       Runtime.getRuntime().availableProcessors());
AtomicLong found=new AtomicLong(Long.MAX_VALUE);
LongStream.range(1, 1000000).filter(n -> found.get()==Long.MAX_VALUE)
    .forEach(n -> es.submit(()->{
        if(found.get()>n && fourConsecutives(n)) for(;;) {
            long x=found.get();
            if(x<n || found.compareAndSet(x, n)) break;
        }
    }));
es.shutdown();
try { es.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); }
catch (InterruptedException ex) {throw new AssertionError(ex); }
long result=found.get();
System.out.println(result==Long.MAX_VALUE? "not found": result);

On my machine it does what I would expect from parallel execution taking only slightly more than ⟨sequential time⟩/⟨number of cpu cores⟩. Without changing anything in your fourConsecutives implementation.

The bottom line is that, at least when processing a single item takes significant time, the current Stream implementation (or the underlying Fork/Join framework) has problems as already discussed in this related question. If you want reliable parallelism I would recommend to use proved and tested ExecutorServices. As you can see in my example, it does not mean to drop the Java 8 features, they fit together well. Only the automated parallelism introduced with Stream.parallel should be used with care (given the current implementation).

Community
  • 1
  • 1
Holger
  • 285,553
  • 42
  • 434
  • 765
  • Thank you for your answer. If you see my comments in Brian's answer, you'll see that in principal I am agreeing with him but pointing out that there is most likely something wrong in the framework. I wasn't aware of the thread you linked to - so thank you for that and your sample code. – Amir Afghani Jun 04 '14 at 20:37
  • 2
    So, your approach might work well for this problem, but in general (when the task can't easily be sliced a priori into equal-sized tasks), you'll find that the shared work queue in the ExecutorService quickly becomes a contention bottleneck. The approach you describe only works for the most coarse-grained parallelism. – Brian Goetz Jun 04 '14 at 21:47
  • @Brian Goetz: as long as all tasks are CPU intense, they don’t have to be equal-sized, all CPU cores will always have some work to do. The main problem of my solution is the overhead when the tasks are rather small (or even worse, when the hotspot optimizer could elide operations when they were performed in a simple loop instead of parallel). Unless there is a `Stream` implementation that interacts with the HotSpot optimizer itself there can’t be a single strategy that fits all. The main problem I see is that the developer does not know for which kind of problems an implementation is optimized – Holger Jun 05 '14 at 07:58