21

From the Stream javadoc:

Stream pipelines may execute either sequentially or in parallel. This execution mode is a property of the stream. Streams are created with an initial choice of sequential or parallel execution.

My assumptions:

  1. There is no functional difference between a sequential/parallel streams. Output is never affected by execution mode.
  2. A parallel stream is always preferable, given appropriate number of cores and problem size to justify the overhead, due to the performance gains.
  3. We want to write code once and run anywhere without having to care about the hardware (this is Java, after all).

Assuming these assumptions are valid (nothing wrong with a bit of meta-assumption), what's the value in having the execution mode exposed in the api?

It seems like you should just be able to declare a Stream, and the choice of sequential/parallel execution should be handled automagically in a layer below, either by library code or the JVM itself as a function of the cores available at runtime, the size of the problem, etc.

Sure, assuming parallel streams also work on a single core machine, perhaps just always using a parallel stream achieves this. But this is really ugly - why have explicit references to parallel streams in my code when it's the default option?

Even if there is a scenario where you'd deliberately want to hard code the use of a sequential stream - why is there not just a sub-interface SequentialStream for that purpose, rather than polluting Stream with an execution mode switch?

davnicwil
  • 28,487
  • 16
  • 107
  • 123
  • 2
    What if your code isn't thread-safe? – SLaks Apr 09 '14 at 00:24
  • 1
    Hmm, good point - but isn't that the exception rather than the general case? If you're programming functionally your code ought to be thread safe by design. If you're not, why are you using streams and lambdas in the first place? – davnicwil Apr 09 '14 at 00:28

4 Answers4

29

It seems like you should just be able to declare a Stream, and the choice of sequential/parallel execution should be handled automagically in a layer below, either by library code or the JVM itself as a function of the cores available at runtime, the size of the problem, etc.

The reality is that a) streams are a library, and have no special JVM magic, and b) you can't really design a library smart enough to automagically figure out what the right decision is in this particular case. There's no sensible way to estimate how costly a particular function will be without running it -- even if you could introspect its implementation, which you can't -- and now you're introducing a benchmark into every stream operation, trying to figure out if parallelizing it will be worth the cost of the parallelism overhead. That's just not practical, especially given that you don't know in advance how bad the parallelism overhead is, either.

A parallel stream is always preferable, given appropriate number of cores and problem size to justify the overhead, due to the performance gains.

Not always, in practice. Some tasks are just so small that they're not worth parallelizing, and parallelism does always have some overhead. (And frankly, most programmers tend to overestimate the usefulness of parallelism, slapping it everywhere when it's really hurting performance.)

Basically, it's a hard enough problem that you basically have to shove it off onto the programmer.

Louis Wasserman
  • 191,574
  • 25
  • 345
  • 413
  • I agree. I'd say most of the time if the task is small it is not worth parallelising. It really depends on the hardware, type of the operation, etc. – Ushox Apr 09 '14 at 04:07
  • 1
    Good answer. See things like this question (especially the linked lambda-dev discussion): http://stackoverflow.com/questions/22845699/is-it-discouraged-using-java-8-parallel-streams-inside-a-j2ee-container about the difficulties associated with trying to get too clever with such important, fundamental parts of the core libraries. – Shorn Apr 09 '14 at 05:09
  • That's an interesting point about it being a hard enough problem to not attempt it in library code. But, that said, I don't see that it's that horrible to guesstimate whether it's worth the overhead with heuristics. You know the number of cores, and you know the size of the Stream. Anyway, I guess it's a decision that's been made, to just completely sidestep that problem and shove it onto the programmer as you say - and after all Java is a language, not a framework. – davnicwil Apr 09 '14 at 11:05
5

There's an interesting case in this question showing that sometimes parallel stream might be slower in orders of magnitude. In that particular example parallel version runs for ten minutes while sequential takes several seconds.

Community
  • 1
  • 1
Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
3

There is no functional difference between a sequential/parallel streams. Output is never affected by execution mode.

There is a difference between sequential/parallel streams execution. In the below code TEST_2 results shows that parallel thread execution is very much faster than the sequential way.

A parallel stream is always preferable, given appropriate number of cores and problem size to justify the overhead, due to the performance gains.

Not really. if task is not worthy(simple tasks) to be executed in parallel threads, then it is simply we are adding overhead to our code. TEST_1 results shows this. Also note that if all the worker threads are busy on one parallel execution tasks; then other parallel stream operation elsewhere in your code will be waiting for that.

We want to write code once and run anywhere without having to care about the hardware (this is Java, after all).

Since only programmer knows about; is it worthy to execute this task in parallel/sequential irrespective of CPU's. So java API exposed both option to the developer.

import java.util.ArrayList;
import java.util.List;

/*
 * Performance test over internal(parallel/sequential) and external iterations.
 * https://docs.oracle.com/javase/tutorial/collections/streams/parallelism.html
 * 
 * 
 * Parallel computing involves dividing a problem into subproblems, 
 * solving those problems simultaneously (in parallel, with each subproblem running in a separate thread),
 *  and then combining the results of the solutions to the subproblems. Java SE provides the fork/join framework, 
 *  which enables you to more easily implement parallel computing in your applications. However, with this framework, 
 *  you must specify how the problems are subdivided (partitioned). 
 *  With aggregate operations, the Java runtime performs this partitioning and combining of solutions for you.
 * 
 * Limit the parallelism that the ForkJoinPool offers you. You can do it yourself by supplying the -Djava.util.concurrent.ForkJoinPool.common.parallelism=1,
 *  so that the pool size is limited to one and no gain from parallelization
 *  
 *  @see ForkJoinPool
 *  https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html
 *  
 *  ForkJoinPool, that pool creates a fixed number of threads (default: number of cores) and 
 *  will never create more threads (unless the application indicates a need for those by using managedBlock).
 *   *  http://stackoverflow.com/questions/10797568/what-determines-the-number-of-threads-a-java-forkjoinpool-creates
 *  
 */
public class IterationThroughStream {
    private static boolean found = false;
    private static List<Integer> smallListOfNumbers = null;
    public static void main(String[] args) throws InterruptedException {


        // TEST_1
        List<String> bigListOfStrings = new ArrayList<String>();
        for(Long i = 1l; i <= 1000000l; i++) {
            bigListOfStrings.add("Counter no: "+ i);
        }

        System.out.println("Test Start");
        System.out.println("-----------");
        long startExternalIteration = System.currentTimeMillis();
        externalIteration(bigListOfStrings);
        long endExternalIteration = System.currentTimeMillis();
        System.out.println("Time taken for externalIteration(bigListOfStrings) is :" + (endExternalIteration - startExternalIteration) + " , and the result found: "+ found);

        long startInternalIteration = System.currentTimeMillis();
        internalIteration(bigListOfStrings);
        long endInternalIteration = System.currentTimeMillis();
        System.out.println("Time taken for internalIteration(bigListOfStrings) is :" + (endInternalIteration - startInternalIteration) + " , and the result found: "+ found);





        // TEST_2
        smallListOfNumbers = new ArrayList<Integer>();
        for(int i = 1; i <= 10; i++) {
            smallListOfNumbers.add(i);
        }

        long startExternalIteration1 = System.currentTimeMillis();
        externalIterationOnSleep(smallListOfNumbers);
        long endExternalIteration1 = System.currentTimeMillis();
        System.out.println("Time taken for externalIterationOnSleep(smallListOfNumbers) is :" + (endExternalIteration1 - startExternalIteration1));

        long startInternalIteration1 = System.currentTimeMillis();
        internalIterationOnSleep(smallListOfNumbers);
        long endInternalIteration1 = System.currentTimeMillis();
        System.out.println("Time taken for internalIterationOnSleep(smallListOfNumbers) is :" + (endInternalIteration1 - startInternalIteration1));




        // TEST_3
        Thread t1 = new Thread(IterationThroughStream :: internalIterationOnThread);
        Thread t2 = new Thread(IterationThroughStream :: internalIterationOnThread);
        Thread t3 = new Thread(IterationThroughStream :: internalIterationOnThread);
        Thread t4 = new Thread(IterationThroughStream :: internalIterationOnThread);

        t1.start();
        t2.start();
        t3.start();
        t4.start();

        Thread.sleep(30000);
    }


    private static boolean externalIteration(List<String> bigListOfStrings) {
        found = false;
        for(String s : bigListOfStrings) {
            if(s.equals("Counter no: 1000000")) {
                found = true;
            }
        }
        return found;
    }

    private static boolean internalIteration(List<String> bigListOfStrings) {
        found = false;
        bigListOfStrings.parallelStream().forEach(
                (String s) -> { 
                    if(s.equals("Counter no: 1000000")){  //Have a breakpoint to look how many threads are spawned.
                        found = true;
                    }

                }
            );
        return found;       
    }


    private static boolean externalIterationOnSleep(List<Integer> smallListOfNumbers) {
        found = false;
        for(Integer s : smallListOfNumbers) {
            try {
                Thread.sleep(100);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return found;
    }

    private static boolean internalIterationOnSleep(List<Integer> smallListOfNumbers) {
        found = false;
        smallListOfNumbers.parallelStream().forEach( //Removing parallelStream() will behave as single threaded (sequential access).
                (Integer s) -> {
                    try {
                        Thread.sleep(100); //Have a breakpoint to look how many threads are spawned.
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            );
        return found;       
    }

    public static void internalIterationOnThread() {
        smallListOfNumbers.parallelStream().forEach(
                (Integer s) -> {
                    try {
                        /*
                         * DANGEROUS
                         * This will tell you that if all the 7 FJP(Fork join pool) worker threads are blocked for one single thread (e.g. t1), 
                         * then other normal three(t2 - t4) thread wont execute, will wait for FJP worker threads. 
                         */
                        Thread.sleep(100); //Have a breakpoint here.
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            );
    }
}
Kanagavelu Sugumar
  • 18,766
  • 20
  • 94
  • 101
0

It seems like you should just be able to declare a Stream, and the choice of sequential/parallel execution should be handled automagically in a layer below, either by library code or the JVM itself as a function of the cores available at runtime, the size of the problem, etc.

To add to the already given answers:

Thats a pretty bold assumption. Imagine simulating a board-game for training some form of AI, it's pretty easy to parallelize the execution of different playthroughs - just create a new instance and let it run on its own thread. As it doesn't share any state with another playthrough you don't even have to consider multi-threading issues in your game logic. If you on the other hand parallelize the game logic itself you get all sorts of multi-threading issues and most likely pay a steep price for complexity and even performance.

Having control over the behaviour of streams gives you (appropriately limited) flexibility which in and of itself is a key feature for good library design.

roookeee
  • 1,710
  • 13
  • 24
  • 1
    I do see your point, but I feel it's a fair assumption that streams should be thread safe. If your stream logic *relies* on being single-threaded due to relying on shared memory etc then I don't see why you'd use streams at all. Streams are in my view a functional programming pattern, that is data processing without caring about the low level details of execution - order, side effects, parallel or serial execution -- these are all details you shouldn't have to care about. I definitely see your point about flexibility and control in the API, but I disagree that it's a good design choice :-) – davnicwil Jan 16 '19 at 10:47