5

In Java 8 one can set a custom forkJoinPool to be used by parallel streams rather than the common pool.

forkJoinPool.submit(() -> list.parallelStream().forEach(x ->{...} ))

My question is how does it technically happen?
The stream is not in any way aware it was submitted to a custom forkJoinpool and has no direct access to it. So how are the correct threads eventually used for processing the stream's tasks?

I tried looking at the source code but to no avail. My best guess is some threadLocal variable set at some point when submitting and then used by the stream later on. If so, why would the language developers choose such a way to implement the behaviour rather than, say, dependency injecting the pool into the stream?

Thanks!

  • Check this question: https://stackoverflow.com/questions/21163108/custom-thread-pool-in-java-8-parallel-stream. – JMSilla Jul 18 '19 at 07:38
  • It can find out by `Thread.currentThread();`. – Kartik Jul 18 '19 at 07:39
  • 2
    Actually, *it doesn’t*. It’s the underlying Fork/Join framework itself, which does this, whereas the Stream API designers did not consider this at all. That’s why this is not documented and even doesn’t work properly (in Java 8, at some places, the Stream implementation accesses the default pool parallelism no matter which pool you actually use). – Holger Jul 18 '19 at 09:58
  • @Holger but you are saying about parallelism, the OP seems to ask about the pool that is used – Eugene Jul 18 '19 at 11:08
  • 2
    @Eugene no, I’m saying, the Stream implementors intended to always use the common pool, but did not consider the fact that the `ForkJoinTask` will use a different pool under certain circumstances. Having the use of the common pool’s parallelism hardcoded, is an indicator for that. This has been fixed in newer versions, still, the use of a different pool stayed undocumented and not officially supported. In fact, the whole use of `ForkJoinPool` is considered an implementation detail. Since the question is about the stream and the pool, the answer is, that’s an actually unintended side effect. – Holger Jul 18 '19 at 11:14

2 Answers2

2

From what I've read the code, the decisions is made only based on the initial thread that triggers the computation, inside the method ForkJoinTask::fork, that literally does a check against what thread triggered this (also in it's documentation):

Thread.currentThread()) instanceof ForkJoinWorkerThread

So if an instance of ForkJoinWorkerThread has started this (this is what you would get via a custom ForkJoinPool), use whatever the pool already exists and this task run in; otherwise (if it is a different thread that is not an instance of ForkJoinWorkerThread) use:

ForkJoinPool.common.externalPush(this); 

Also interesting that ForkJoinWorkerThread is actually a public class, so you could start the computation inside an instance of it, but still using a different pool; though I have not tried this.

Eugene
  • 117,005
  • 15
  • 201
  • 306
1

The java.util.stream.ForEachOps.ForEachOp#evaluateParallel method calls invoke():

@Override
public <S> Void evaluateParallel(PipelineHelper<T> helper,
                                 Spliterator<S> spliterator) {
    if (ordered)
        new ForEachOrderedTask<>(helper, spliterator, this).invoke();
    else
        new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
    return null;
}

which in turn calls java.util.concurrent.ForkJoinTask#doInvoke:

private int doInvoke() {
    int s; Thread t; ForkJoinWorkerThread wt;
    return (s = doExec()) < 0 ? s :
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (wt = (ForkJoinWorkerThread)t).pool.
        awaitJoin(wt.workQueue, this, 0L) :
        externalAwaitDone();
}

As seen in the above method, it finds out the current thread using Thread.currentThread().

It then uses the .pool field as in (wt = (ForkJoinWorkerThread)t).pool, which gives the current pool that this thread is running in:

public class ForkJoinWorkerThread extends Thread {

    final ForkJoinPool pool;                // the pool this thread works in
Kartik
  • 7,677
  • 4
  • 28
  • 50