3

By default, the size of commonPool inside parallelStream should be cpu_cores - 1.

But, in my application it is always greater than the hardware cpu_cores.

VisualVM screenshot:

enter image description here

so confused, I'd already searched half of the Internet but could not find the answer to this.

My config:

Runtime.getRuntime().availableProcessors()=12

java.util.concurrent.ForkJoinPool.common.parallelism=null(default setting)

My code:

            final CountDownLatch countDownLatch = new CountDownLatch(tempList.size());
            tempList.parallelStream().forEach(om -> {
                countDownLatch.countDown();
                redisReBloomService.add(config.getRedisKey(), om.getChannelNo());
            });
            countDownLatch.await();

Also, I had tried custom pool setting and it does not work either-

ForkJoinPool forkJoinPool = new ForkJoinPool(3);  
forkJoinPool.submit(() -> {  
    tempList.parallelStream().forEach(om -> {
        countDownLatch.countDown();
        redisReBloomService.add(config.getRedisKey(), om.getChannelNo());
    }).get();
});

Some info: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html Custom thread pool in Java 8 parallel stream

Armine
  • 1,675
  • 2
  • 24
  • 40
MichaelM
  • 31
  • 2

1 Answers1

8

The parallelism in ForkJoinPool is not the maximum number of threads in the pool. It is a target of active threads. If some threads are blocked, the pool may start new threads to achive the desired level of parallelism.

From the documentation of ForkJoinPool:

The pool attempts to maintain enough active (or available) threads by dynamically adding, suspending, or resuming internal worker threads, even if some tasks are stalled waiting to join others. However, no such adjustments are guaranteed in the face of blocked I/O or other unmanaged synchronization. The nested ForkJoinPool.ManagedBlocker interface enables extension of the kinds of synchronization accommodated.

The screenshot shows that new threads are started exactly at the same time when other threads are switched to state Monitor (the pink ones). My guess is that the redisReBloomService.add(…) call uses a ManagedBlocker internally when it has to wait on that monitor, causing the pool to start more worker threads.

Here is a small example using ManagedBlocker which demonstrates similar behavior you have observed. When the ManagedBlocker sleeps for 1 second, often a new worker thread can be observed in VisualVM.

public class ForkJoinPoolTest {

    @Test
    public void testManagedBlocker() throws InterruptedException {
        // wait to be able to connect with VisualVM
        Thread.sleep(10_000);

        IntStream.range(0, 100).parallel().peek(number -> {
                doWork();

                // Run a managed blocker some times.
                // Every time it blocks, a new worker thread might be started.
                if (ThreadLocalRandom.current().nextInt(10) == 0) {
                    try {
                        ForkJoinPool.managedBlock(new ManagedBlocker() {
                            @Override
                            public boolean block() throws InterruptedException {
                                Thread.sleep(1_000);
                                return true;
                            }

                            @Override
                            public boolean isReleasable() {
                                return false;
                            }
                        });
                    } catch (InterruptedException ignored) { }
                }
            })
            .sum();
    }

    /** Some CPU bound workload **/
    void doWork() {
        for (int i = 0; i < 1_000_000; i++) {
            Math.random();
        }
    }
}

Florian Gutmann
  • 2,666
  • 2
  • 20
  • 28