2

I have made two separate implementations of parallel reads from database. First implementation is using ExecutorService with newCachedThreadPool() constructor and Futures: I simply make a call that returns a future for each read case and then after I make all the calls I call get() on them. This implementation works OK and is fast enough.

The second implementation is using parallel streams. When I put parallel stream call into the same ExecutorService pool it works almost 5 times slower and it seems that it is not using as many threads as I would hope. When I instead put it into ForkJoinPool pool = new ForkJoinPool(50) then it works as fast as the previous implementation.

My question is:

Why do parallel streams under-utilize threads in newCachedThreadPool version?

Here is the code for the second implementation (I am not posting the first implementation, cause that one works OK anyway):

private static final ExecutorService pool = Executors.newCachedThreadPool();

final List<AbstractMap.SimpleImmutableEntry<String, String>> simpleImmutableEntryStream =
                personIdList.stream().flatMap(
                        personId -> movieIdList.stream().map(
                                movieId -> new AbstractMap.SimpleImmutableEntry<>(personId, movieId))).collect(Collectors.toList());

final Future<Map<String, List<Summary>>> futureMovieSummaryForPerson = pool.submit(() -> {
      final Stream<Summary> summaryStream = simpleImmutableEntryStream.parallelStream().map(
            inputPair -> {
                   return FeedbackDao.find(inputPair.getKey(), inputPair.getValue());
            }).filter(Objects::nonNull);
return summaryStream.collect(Collectors.groupingBy(Summary::getPersonId));
});
Tofig Hasanov
  • 3,303
  • 10
  • 51
  • 81
  • please show your code – Nicolas Filotto Aug 10 '16 at 11:27
  • When I replace the first line of the code with: "ForkJoinPool pool = new ForkJoinPool(50);" then it works fast again. – Tofig Hasanov Aug 10 '16 at 11:44
  • According to your code, you submit only task, right? or I miss something? – Nicolas Filotto Aug 10 '16 at 11:49
  • I submit parallelstream.map, which will create multiple asynchronous tasks, each of which makes a call to the database. If I call parallelstream.map without putting it into the pool, then it will by default use ForkJoinPool with number of threads limited by number of processors, which I didn't want. – Tofig Hasanov Aug 10 '16 at 11:55
  • Are you sure? for me you submit only one task to your pool and this task calls simpleImmutableEntryStream.parallelStream() that will internally use its own ForkJoinPool pool not yours – Nicolas Filotto Aug 10 '16 at 12:03
  • 1
    Check this question: http://stackoverflow.com/questions/21163108/custom-thread-pool-in-java-8-parallel-stream – Tofig Hasanov Aug 10 '16 at 12:24
  • Also I did verify that this actually does create multiple threads (I logged Thread.currentThread().getId()). It's just that the number of threads is surprisingly low. – Tofig Hasanov Aug 10 '16 at 12:32

1 Answers1

2

This is related to how ForkJoinTask.fork is implemented, if the current thread comes from a ForkJoinPool it will use the same pool to submit the new tasks but if not it will use the common pool with the total amount of processors in your local machine and here when you create your pool with Executors.newCachedThreadPool(), the thread created by this pool is not recognized as coming from a ForkJoinPool such that it uses the common pool.

Here is how it is implemented, it should help you to better understand:

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

The thread created by the pool Executors.newCachedThreadPool() will not be of type ForkJoinWorkerThread such that it will use the common pool with an under optimized pool size to submit the new tasks.

Nicolas Filotto
  • 43,537
  • 11
  • 94
  • 122