14

So I know that if you use the parallelStream without a custom ForkJoinPool it will use the default ForkJoinPool which by default has one less threads as you have processors.

So, as stated here (and also in the other answer of that question) in order to have more parallelism, you have to:

submit the parallel stream execution to your own ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(doSomething));

So, I did this:

import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;
import com.google.common.collect.Sets;

public class Main {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ForkJoinPool forkJoinPool = new ForkJoinPool(1000);

        IntStream stream = IntStream.range(0, 999999);

        final Set<String> thNames = Collections.synchronizedSet(new HashSet<String>());

        forkJoinPool.submit(() -> {
            stream.parallel().forEach(n -> {

                System.out.println("Processing n: " + n);
                try {
                    Thread.sleep(500);
                    thNames.add(Thread.currentThread().getName());
                    System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount());
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
        }).get();
    }
}

I made a Set of thread Names in order to see how many threads are being created, and also logged the number of active threads that the pool has and both numbers don't grow up more that 16, so that means that the parallelism here is not being more than 16 (why even 16?). If I do not use the forkJoinPool, I get 4 as parallelism, which is according to the number of processors I have.

Why does it give me 16 and not 1000?

Community
  • 1
  • 1
Pablo Matias Gomez
  • 6,614
  • 7
  • 38
  • 72
  • @DavidSchwartz but check the post I referenced. There it says that doing this, it will use all the threads of the parent pool. – Pablo Matias Gomez Apr 29 '16 at 21:17
  • Uh, note that `thNames` itself is not thread-safe and you're trying to reference and modify it from many threads. – Louis Wasserman Apr 29 '16 at 21:19
  • Also, I wonder -- it seems unlikely, but you might try moving the construction of the stream into the lambda you're passing to `.submit`. – Louis Wasserman Apr 29 '16 at 21:20
  • 2
    I think there might be an issue with `parallel()` choosing the incorrect `ForkJoinPool`. When you set `System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "1000");`, it correctly uses that many worker threads, from the default pool. – Sotirios Delimanolis Apr 29 '16 at 21:21
  • @LouisWasserman tried moving the construction of the stream and it is the same :/ – Pablo Matias Gomez Apr 29 '16 at 21:24
  • @SotiriosDelimanolis yeah it works with that, but shouldn't it work with just invoking the parallel stream into the forkjoinpool? – Pablo Matias Gomez Apr 29 '16 at 21:24
  • That's my impression, yes. – Sotirios Delimanolis Apr 29 '16 at 21:25
  • @SotiriosDelimanolis I get that it's your impression, but you seem to have done an experiment that seems to demonstrate that it doesn't actually work. – Louis Wasserman Apr 29 '16 at 21:33
  • @LouisWasserman but check the post I linked, the answers that tells to do this have so many votes that it proves that it is not only an impression. It should work doing what I did but I don't know what is going on here. – Pablo Matias Gomez Apr 29 '16 at 21:34
  • 1
    @LouisWasserman My experiment was using a different `ForkJoinPool` (from netty), so can't rely on it. I'd like to rely on the highly upvoted [post](http://stackoverflow.com/questions/21163108/custom-thread-pool-in-java-8-parallel-stream/22269778#22269778). – Sotirios Delimanolis Apr 29 '16 at 21:34
  • ForkJoinPool seems to have some magic that creates only as many threads as needed. Parallelism serves only as upper limit. The "compensation" part described in the implementation notes here http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8u40-b25/java/util/concurrent/ForkJoinPool.java#590 is maybe explaining that. – zapl Apr 29 '16 at 21:52
  • 2
    I'm very confused. When you set the common pool's parallelism with the system property, it triggers something that allows your pool to also reach that size. If you don't, your pool is maxed out. I can't find what that is from looking at the code right now. It might be the `commonParallelism`. – Sotirios Delimanolis Apr 29 '16 at 22:06
  • 1
    I've closed as a duplicate, where an answer is posted by a Java language developer. The `commonParallelism` is used by a component in the `Stream` pipeline which estimates how much work to do in parallel. In my opinion, it shouldn't rely on `commonParallelism`, but on the parallelism of the current `ForkJoinPool`. They say _Again, this is not guaranteed behavior, and it may change in the future. But this technique will probably work for JDK 8 implementations for the forseeable future._ You're stuck with setting the common parallelism. – Sotirios Delimanolis Apr 30 '16 at 00:09
  • @SotiriosDelimanolis so, the post i've linked is wrong..? I mean, the answer provided there does not really work as expected.. – Pablo Matias Gomez Apr 30 '16 at 00:13
  • 2
    So that post only discusses how to execute the tasks in a custom `ForkJoinPool`, which your code _is_ doing (you can verify this with a custom thread factory with custom names for the fork join threads). Your issue here is deeper than that. It relates to how the `Stream#parallel()` ends up using your FJP, limiting the parallelism separately. This part is unspecified. – Sotirios Delimanolis Apr 30 '16 at 00:15
  • You need enough thread to keep all your CPU busy, most than this and you are likely to be adding overhead and run slower. – Peter Lawrey Apr 30 '16 at 06:41
  • 1
    @PeterLawrey Nonsense. If you only have enough threads to keep your CPUs busy, what happens when a thread has to wait for I/O, for example due to a page fault? – David Schwartz Apr 30 '16 at 23:38
  • @DavidSchwartz when you get a page fault, the OS has to do something at that point. Generally the OS has to do something in many points and you have other programs running which is more than enough to keep everything busy during the odd page fault. If you have a high number of page faults, you have effectively moved the bottleneck to your disk subsystem and no matter what you do in your CPUs, random access to dis, even SSD is going to be orders of magnitude slower. The only time using more threads than CPU helps is when you have a large number of network IO requests pending. – Peter Lawrey May 01 '16 at 09:20
  • @PeterLawrey The idea that other tasks will keep the CPU busy only applies if there *are* other tasks that you care about. And that random access to disk, or even SSD, is orders of magnitude slower is precisely the point -- why have an idle core during disk accesses? – David Schwartz May 01 '16 at 18:29
  • @DavidSchwartz you shouldn't but chances are you won't have cores which are idle esp if you have hyperthreading. – Peter Lawrey May 01 '16 at 19:44
  • 1
    If you don’t understand that you *must not* manipulate a `HashSet` from multiple threads, you are not ready for using multi-threading at all… – Holger May 02 '16 at 14:31
  • @Holger yes I do understand that and I do know that I could have used a synchronized Set, but this was just an example and that won't make any difference. That's why I also printed the active threads count. Maybe you don't know the answer to this question? – Pablo Matias Gomez May 02 '16 at 14:39
  • 2
    You already *have* an answer. There is no need to add another. Still, there is no reason to post broken code, not even if “this was just an example”. Instantiating a thread-safe set instead still is a single line of code. By the way, if “this was just an example”, why aren’t you even able to create a new hash set without a third party library dependency? – Holger May 02 '16 at 14:54
  • @Holger happy now? – Pablo Matias Gomez May 02 '16 at 15:19
  • 1
    Well, it’s not what I would use, but yes, correct code makes me happy. Besides that, does the [existing answer](http://stackoverflow.com/a/36949448/2711488) leave something open that should be addressed? – Holger May 02 '16 at 15:23

2 Answers2

11

Update

Originally this answer was an elaborate explanation claiming that the ForkJoinPool applies back-pressure and doesn't even reach the prescribed parallelism level, because there are always idle workers available to process the stream.

That's incorrect.

The actual answer is provided in the original question to which this was marked as duplicate - using a custom ForkJoinPool for stream processing is not officially supported, and when using forEach, the default pool parallelism is used to determine the stream spliterator behavior.

Here's an example how when tasks are manually submitted to a custom ForkJoinPool, the pool's active thread count easily reaches its parallelism level:

for (int i = 0; i < 1_000_000; ++i) {
   forkJoinPool.submit(() -> {
      try {
         Thread.sleep(1);
         thNames.add(Thread.currentThread().getName());
         System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount() + " parallelism: " + forkJoinPool.getParallelism());
      } catch (Exception e) {
         throw new RuntimeException(e);
      }
   });
}

Thanks to Stuart Marks for pointing this out and to Sotirios Delimanolis for arguing that my original answer is wrong :)

Dimitar Dimitrov
  • 16,032
  • 5
  • 53
  • 55
-5

It seems to me that when you submit a lambda to the FJP that lambda will use the common pool and not the FJP. Sotirios Delimanolis proved this with his comment, above. What you are submitting is a Task that runs in your FJP.

Try profiling this code to see what threads are actually being used.

You cannot name the threads within the FJP.

edharned
  • 1,884
  • 1
  • 19
  • 20
  • 3
    Nono. Maybe I worded my comment incorrectly. The task should run in the context of whatever `ForkJoinPool` instance it was submitted on. It does this by looking at the type of the currently running thread. If it's of type `ForkJoinWorkerThread`, it then has access to the `ForkJoinPool` that created it. It can use that to submit the other parallel tasks. You can name the threads of custom `ForkJoinPool` by providing an appropriate thread factory. – Sotirios Delimanolis Apr 29 '16 at 22:23