0

I was trying to use paralleStream with a custom ForkJoin pool, the task performs network calls. When I use the following style

pool.submit(() -> {
        ioDelays.parallelStream().forEach(n -> {
            induceRandomSleep(n);
        });
    }).get();

The time taken is almost 11 times compared to doing the same if I loop through and submit tasks one by one shown below:

for (final Integer num : ioDelays) {
        ForkJoinTask<Integer> task =  pool.submit(() -> {
            return induceRandomSleep(num);
        });
        tasks.add(task);
    }
    int count = 0;
    final List<Integer> returnVals = new ArrayList<>();
    tasks.forEach(task -> {
        try {
            returnVals.add(task.get());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    });

Is the ForkJoinPool.common involved in someway if parallelStream is used?Here is the entire program to simulate both the styles above

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;

public class FJTPExperiment {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ForkJoinPool pool = new ForkJoinPool(200);

        List<Integer> ioDelays = new ArrayList<>();
        for (int i = 0; i <2000; i++) {
            ioDelays.add( (int)(300 *Math.random() + 200));
        }
        int originalCount = 0;
        for (Integer val : ioDelays) {
            originalCount += val;
        }
        System.out.println("Expected " + originalCount);
        System.out.println(Thread.currentThread().getName() + " ::::Number of threads in common pool :" + ForkJoinPool.getCommonPoolParallelism());


        long beginTimestamp = System.currentTimeMillis();
        pool.submit(() -> {
            ioDelays.parallelStream().forEach(n -> {
                induceRandomSleep(n);
            });
        }).get();
        long endTimestamp = System.currentTimeMillis();
        System.out.println("Took " + (endTimestamp - beginTimestamp) + " ms");


        List<ForkJoinTask<Integer>> tasks = new ArrayList<>();
        beginTimestamp = System.currentTimeMillis();
        for (final Integer num : ioDelays) {
            ForkJoinTask<Integer> task =  pool.submit(() -> {
                return induceRandomSleep(num);
            });
            tasks.add(task);
        }
        int count = 0;
        final List<Integer> returnVals = new ArrayList<>();
        tasks.forEach(task -> {
            try {
                returnVals.add(task.get());
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        });
        endTimestamp = System.currentTimeMillis();
        for (Integer val : returnVals) {
            count += val;
        }
        System.out.println("Count " + count);
        System.out.println("Took " + (endTimestamp - beginTimestamp) + " ms");
    }


    public static int induceRandomSleep(int sleepInterval) {
        System.out.println(Thread.currentThread().getName() + " ::::sleeping for " + sleepInterval + " ms");
        try {
            Thread.sleep(sleepInterval);
            return sleepInterval;
        } catch (InterruptedException e) {
            e.printStackTrace();
            return sleepInterval;
        }
    }
}
Swapnil
  • 897
  • 7
  • 15
  • Looks like this trick of submitting task to your own ForkJoinPool doesn't work in all cases. Please check https://stackoverflow.com/questions/36947336/why-does-the-parallel-stream-not-use-all-the-threads-of-the-forkjoinpool – Ivan Dec 21 '17 at 18:53
  • Thanks for this, but i was able to verify that custom thread pool was used, based on the logs where I printed the thread name as well, it was using the custom thread pool in my case – Swapnil Dec 21 '17 at 19:22
  • 1
    The main task is submitted to your custom pool but parallelStream().forEach() uses default ForkJoinPool.common. You could check this by using `System.setProperty("java.util.concurrent.ForkJoinPool.common‌​.parallelism", "200");` In this case time spent in both will be close to each other – Ivan Dec 21 '17 at 19:32
  • I agree partly, it is stuck with the parallelism but is still not using the common pool, based on the logs. I tried a similar variant "parallelStream().limit(200).forEach" that worked the same way as you mentioned. The worry is if i set the System level property then every thing gets affected – Swapnil Dec 21 '17 at 20:30
  • 1
    From the link provided earlier: using a custom ForkJoinPool for stream processing is not officially supported, and when using forEach, the default pool parallelism is used to determine the stream spliterator behavior. So submitting tasks in a for loop is the only reliable way to use custom ForkJoinPool – Ivan Dec 21 '17 at 20:33
  • Not officially supported is an interesting aspect that I did not know, thanks for that. Can I get some pointers for that documentation because the implementation is in place – Swapnil Dec 21 '17 at 20:42
  • Check a.nswer to this question https://stackoverflow.com/questions/28985704/parallel-stream-from-a-hashset-doesnt-run-in-parallel In the java.util.stream.AbstractTask class, the LEAF_TARGET field determines the amount of splitting that is done, which in turn determines the amount of parallelism that can be achieved. The value of this field is based on ForkJoinPool.getCommonPoolParallelism() which of course uses the parallelism of the common pool, not whatever pool happens to be running the tasks. – Ivan Dec 21 '17 at 21:12

1 Answers1

0

I eventually found the answer there were 2 parts to the question:

1) Only one task is being submitted to the ForkJoinPool how is it spawning multiple threads?

Looking at the JDK implementation it seems when parallelStream is invoked it checks if the current thread is a ForkJoinWorkerThread if so the task is pushed to the queue for the customer ForkJoinPool if not it is pushed to ForkJoinPool.common. This was validated through logs as well.

2) If its working why is it slow?

It was slow because the parallelism is not derived from the parallelism of custom ForkJoinPool, it was being derived from the parallelism of ForkJoinPool.common which is limited to Number of CPU cores -1 by default. The JDK implementation is here the LEAF_TARGET is derived here. If this has to work properly then there should be a branch which derives the LEAF_TARGET from the parallelism of custom thread pool

Swapnil
  • 897
  • 7
  • 15