10

I new to learning the Stream API in Java 8 and functional programming in general, but not new to Java. I am interested in knowing and understanding how the Stream API selects an execution plan.

How does it know which parts to parallelize and which parts not to? How many types of execution plans even exist?

Basically, I want to know why Streams in Java 8 help in making things faster and also how it does some of this "magic".

I couldn't find much literature about how it all works.

ng.newbie
  • 2,807
  • 3
  • 23
  • 57
  • 5
    It doesn't. It executes what you coded. It isn't an SQL database. – user207421 Apr 18 '18 at 10:18
  • @EJP so it is basically not really declarative but still imperative, correct? But then it can also understand when to parallelize and when not to, how does it do that? – ng.newbie Apr 18 '18 at 10:20
  • I don't know when to use parallel. But when I check stream and parallel stream, stream is fast. – janith1024 Apr 18 '18 at 10:26
  • @janith1024 I didn't understand what did you mean by "I don't know when to use parallel", have you never used a parallel stream in Java 8? – ng.newbie Apr 18 '18 at 10:27
  • @ng.newbie I did a test with a collection of data using parallel and without parallel stream. Without parallel was faster, so I used Stream without parallel for all the time. I also want to know when to use parallel – janith1024 Apr 18 '18 at 10:31
  • @ng.newbie, depends on implementation, for instance `ArrayListSpliterator` divided its range in half – Andrew Tobilko Apr 18 '18 at 11:11
  • 3
    It does parallel when you say it, i.e. use `Collection.parallelStream()` or `Stream.parallel()`. – Holger Apr 18 '18 at 12:36

2 Answers2

5

This question is a bit broad to explain in detail but I will try my best to answer it to satisfaction. Also I use the example of a Stream of an ArrayList.

When we are creating a stream the returned object is called ReferencePipeline. This object is the "default stream" object so to say since it does not feature any functionality yet. Now we have to decide between lazy and eager methods. So let's take a look at one example each.

Example one: The filter(Predicate<?>) method:

The filter() method is declared as follows:

@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    if (predicate.test(u))
                        downstream.accept(u);
                }
            };
        }
    };
}

As you can see it returns a StatelessOp object which is basically a new ReferencePipeline where filter evaluation is now 'enabled'. In other words: Every time we add a new 'functionality' to the stream it creates a new Pipeline based on the old one and with proper operation flags / method overrides.
As you maybe already know streams are not evaluated until an eager operation is called. So we need an eager method to evaluate the stream.

Exmaple two: The forEach(Consumer<?>) method:

@Override
public void forEach(Consumer<? super P_OUT> action) {
    evaluate(ForEachOps.makeRef(action, false));
}

At first this is rather short and the evaluate() method does nothing more that calling the invoke() method. Here it is important to understand what ForEachOps.makeRef() does. It sets the last flags that are necessary an creates an ForEachTask<> which is working exactly the same as a ForkJoinTask object. And happily Andrew found a nice paper on how they work.


Note: The exact sourcecode can be found here.

L.Spillner
  • 1,772
  • 10
  • 19
  • @L.Spiilner When you say `ForEachOps.makeRef()` changes the bits - whose bits does it change? – ng.newbie Apr 18 '18 at 11:20
  • found the work http://gee.cs.oswego.edu/dl/papers/fj.pdf which better describes what I was trying to say – Andrew Tobilko Apr 18 '18 at 11:23
  • @ng.newbie You got that wrong. There is not bit manipulation involved. It was just a phrase to make clear that it will be a ForEachTask. – L.Spillner Apr 18 '18 at 11:24
  • @Andrew Thank you for the clarification. Make an edit suggestion or I'll add it myself. Depends on what you prefer ;) – L.Spillner Apr 18 '18 at 11:25
  • @L.Spillner When you were referring to bits are you referring to [StreamOpFlag](http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8u40-b25/java/util/stream/StreamOpFlag.java#StreamOpFlag) I couldn't understand anything about it. – ng.newbie Apr 18 '18 at 11:35
  • @ng.newbie sorry now I understood how you interpreted the sentence. And I got to confess it was a bit misleading. The part has absolutely nothing to do with bits or bytes. I edited it to make it more clear. – L.Spillner Apr 18 '18 at 11:38
2

As you might already know, Stream API uses a Spliterator and ForkJoinPool to perform parallel computations.  A Spliterator is used for traversing and partitioning sequences of elements,  while a ForkJoinPool framework recursively breaks the  task  into  smaller  independent  sub-tasks  until they are simple enough to be executed asynchronously.

As an example of how a parallel computation framework, such as the  java.util.stream  package, would use Spliterator and ForkJoinPool in a parallel computation, here is one way to implement an associated parallel forEach, that illustrates the primary idiom:

public static void main(String[] args) {
    List<Integer> list = new SplittableRandom()
        .ints(24, 0, 100)
        .boxed().collect(Collectors.toList());

    parallelEach(list, System.out::println);
}

static <T> void parallelEach(Collection<T> c, Consumer<T> action) {
    Spliterator<T> s = c.spliterator();
    long batchSize = s.estimateSize() / (ForkJoinPool.getCommonPoolParallelism() * 8);
    new ParallelEach(null, s, action, batchSize).invoke(); // invoke the task
}

The Fork Join Task:

static class ParallelEach<T> extends CountedCompleter<Void> {
    final Spliterator<T> spliterator;
    final Consumer<T> action;
    final long batchSize;

    ParallelEach(ParallelEach<T> parent, Spliterator<T> spliterator,
                 Consumer<T> action, long batchSize) {
        super(parent);
        this.spliterator = spliterator;
        this.action = action;
        this.batchSize = batchSize;
    }

    // The main computation performed by this task
    @Override
    public void compute() {
        Spliterator<T> sub;
        while (spliterator.estimateSize() > batchSize &&
              (sub = spliterator.trySplit()) != null) {
            addToPendingCount(1);
            new ParallelEach<>(this, sub, action, batchSize).fork();
        }
        spliterator.forEachRemaining(action);
        propagateCompletion();
    }
}

Original source.

Also, keep in mind that parallel computation may not always be faster than sequential one and you always have a choice - When to use parallel stream.

Oleksandr Pyrohov
  • 14,685
  • 6
  • 61
  • 90
  • I could not understand what your code does. Any resources that help to understand the Fork-Join Framework in Java for noobs? It seems to have a less than [nice reputation](https://stackoverflow.com/a/29970709/7134737). I checked **Java 8 In Action** and even that does not include an explanation for `CountedCompleter`. – ng.newbie Apr 20 '18 at 13:34
  • 1
    Beginner's Introduction to Java's [ForkJoin Framework](https://homes.cs.washington.edu/~djg/teachingMaterials/spac/grossmanSPAC_forkJoinFramework.html) – Oleksandr Pyrohov Apr 20 '18 at 15:10