1

I have created the below program into which I am trying to pass the custom fork join pool and I do not want to use the common join pool but still I see that common pool is being used even after passing the fork join pool please explain why it is happening

package com.example.javanewfeatures;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class ForkJoinPoolExample {

    public static void main(String args[]) throws InterruptedException {

        List<Integer> numbers = buildIntRange();

        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
        Thread t1 = new Thread(() -> forkJoinPool.submit(() -> {
            numbers.parallelStream().forEach(n -> {
                try {
                    Thread.sleep(5);
                    System.out.println("Loop 1 : " + Thread.currentThread());
                } catch (InterruptedException e) {

                }
            });
        }).invoke());

        ForkJoinPool forkJoinPool2 = new ForkJoinPool(4);
        Thread t2 = new Thread(() -> forkJoinPool2.submit(() -> {
            numbers.parallelStream().forEach(n -> {
                try {
                    Thread.sleep(5);
                    System.out.println("Loop 2 : " + Thread.currentThread());
                } catch (InterruptedException e) {

                }
            });
        }).invoke());

        t1.start();
        t2.start();
        t1.join();
        t2.join();

    }

    private static List<Integer> buildIntRange() {
        return IntStream.range(0, 10).boxed().collect(Collectors.toUnmodifiableList());
    }

}
dghtr
  • 561
  • 3
  • 6
  • 20

1 Answers1

0

...but still I see that common pool is being used even after passing the fork join pool

Definitely, when you create a instance of ForkJoinPool the common pool will not be used. You can print the below statements to ensure that is not the case.

System.out.printf("Common Pool:%s\n", ForkJoinPool.commonPool());
System.out.printf("Custom Pool:%s\n", new ForkJoinPool(4));

But in your case, the common pool is not used by your tasks, rather it is used by Streams for parallel computation.

Now, if you want streams to use a custom pool ,then you can refer this post - Parallel streams in custom pool.

In your current implemention this is the behaviour.

  • You are submitting the task to the pool using ForkJoinPool.submit(). This will ensure the tasks is executed in the custom pool.
  • But post that on the returned task you are calling ForkJoinTask.invoke(). This time, the task will be triggered on Thread t1 which is not a FJ Thread so task will be submitted to the common pool. Refer ForkJoinTask.doInvoke() source code below:
  private int doInvoke() {
      int s; Thread t; ForkJoinWorkerThread wt;
      return (s = doExec()) < 0 ? s :
          ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
          (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, this) :
          externalAwaitDone();
  }

  private int externalAwaitDone() {
      int s;
      ForkJoinPool cp = ForkJoinPool.common;
  //...
  }
  • If you observe the output you will have duplicate results. Executions through submit would have used the custom pool and through invoke would have used the common pool.
    ...
    Loop 1 : Thread[ForkJoinPool-2-worker-0,5,main]
    Loop 1 : Thread[ForkJoinPool.commonPool-worker-3,5,main]
    ...
    

To correct your implementation, as mentioned in the above referred post, you can make the following changes

Thread t2 = new Thread(() -> {
    try {
        forkJoinPool2.submit(() -> {
            numbers.stream().forEach(n -> {
                try {
                    Thread.sleep(5);
                    System.out.println("Loop 2 : " + Thread.currentThread());
                } catch (InterruptedException e) {

                }
            });
        }).get(); /*change invoke to get and catch the exception*/
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
});

Output:

Loop 2 : Thread[ForkJoinPool-2-worker-1,5,main]
Loop 1 : Thread[ForkJoinPool-1-worker-1,5,main]
Loop 1 : Thread[ForkJoinPool-1-worker-3,5,main]
Loop 1 : Thread[ForkJoinPool-1-worker-2,5,main]
Loop 1 : Thread[ForkJoinPool-1-worker-0,5,main]
Loop 2 : Thread[ForkJoinPool-2-worker-1,5,main]
Loop 1 : Thread[ForkJoinPool-1-worker-1,5,main]
Loop 1 : Thread[ForkJoinPool-1-worker-3,5,main]
Loop 1 : Thread[ForkJoinPool-1-worker-2,5,main]
Loop 1 : Thread[ForkJoinPool-1-worker-0,5,main]
Loop 2 : Thread[ForkJoinPool-2-worker-1,5,main]
Loop 1 : Thread[ForkJoinPool-1-worker-1,5,main]
Loop 1 : Thread[ForkJoinPool-1-worker-3,5,main]
Loop 2 : Thread[ForkJoinPool-2-worker-1,5,main]
Loop 2 : Thread[ForkJoinPool-2-worker-1,5,main]
Loop 2 : Thread[ForkJoinPool-2-worker-1,5,main]
Loop 2 : Thread[ForkJoinPool-2-worker-1,5,main]
Loop 2 : Thread[ForkJoinPool-2-worker-1,5,main]
Loop 2 : Thread[ForkJoinPool-2-worker-1,5,main]
Loop 2 : Thread[ForkJoinPool-2-worker-1,5,main]
Prasanna
  • 2,390
  • 10
  • 11