9

As I know, the parallel streams use the default ForkJoinPool.commonPool which by default has one less threads than your processors. I want to use my own custom thread pool.

Like this:

@Test
public void stream() throws Exception {
    //System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
    ForkJoinPool pool = new ForkJoinPool(10);
    List<Integer> testList = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
    long start = System.currentTimeMillis();
    List<Integer> result = pool.submit(() -> testList.parallelStream().map(item -> {
        try {
            // read from database
            Thread.sleep(1000);
            System.out.println("task" + item + ":" + Thread.currentThread());
        } catch (Exception e) {
        }
        return item * 10;
    })).get().collect(Collectors.toList());
    System.out.println(result);
    System.out.println(System.currentTimeMillis() - start);
}

And the result: enter image description here

My custom ForkJoinPool is never used. And I change the default parallelism like this:

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");

It works well - the tasks cost only about 1 second.

In my application the task contains heavy IO operation (reading data from db). So I need higher parallelism, but I do not want to change the JVM property.

So what is the right way to specify my own ForkJoinPool?

Or how to use parallel streams in IO-intensive situation?

Jerry Zhou
  • 189
  • 1
  • 2
  • 13

2 Answers2

19

Streams are lazy; all work is done when you commence a terminal operation. In your case, the terminal operation is .collect(Collectors.toList()), which you call in the main thread on the result of get(). Therefore, the actual work will be done the same way as if you’ve constructed the entire stream in the main thread.

For your pool to have an effect, you have to move the terminal operation into the submitted task:

ForkJoinPool pool = new ForkJoinPool(10);
List<Integer> testList = Arrays.asList(
    1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
long start = System.currentTimeMillis();
List<Integer> result = pool.submit(() -> testList.parallelStream().map(item -> {
    try {
        // read from database
        Thread.sleep(1000);
        System.out.println("task" + item + ":" + Thread.currentThread());
    } catch (InterruptedException e) {}
    return item * 10;
}).collect(Collectors.toList())).join();
System.out.println(result);
System.out.println(System.currentTimeMillis() - start);

We can also demonstrate the relevance of the terminal operation by constructing the stream in the main thread and only submitting the terminal operation to the pool:

Stream<Integer> stream = testList.parallelStream().map(item -> {
    try {
        // read from database
        Thread.sleep(1000);
        System.out.println("task" + item + ":" + Thread.currentThread());
    } catch (InterruptedException e) {}
    return item * 10;
});
List<Integer> result = pool.submit(() -> stream.collect(Collectors.toList())).join();

But you should keep in mind that this is undocumented behavior, which is not guaranteed. The actual answer must be that the Stream API in its current form, with no thread control (and no help for dealing with checked exceptions), is not suitable for parallel I/O operations.

Holger
  • 285,553
  • 42
  • 434
  • 765
6

I assume you discovered the trick described here:

which states

The trick is based on ForkJoinTask.fork which specifies: "Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool()"

In your code, parallelStream() and map(...) are invoked in the custom ForkJoinPool, but the Function passed to map is not.

Remember that Stream#map is an intermediate operation. Its Function will only be executed for its element once a terminal operation is chained. In your case, that terminal operation is collect(...). And since collect(Collectors.toList() is invoked in the main thread, the map's Function is invoked on each element in parallel in the commonPool.

You could simply move the collect(...) call inside your submit(...).

List<Integer> result = pool.submit(() -> testList.parallelStream().map(item -> {
    try {
        // read from database
        Thread.sleep(1000);
        System.out.println("task" + item + ":" + Thread.currentThread());
    } catch (Exception e) {
    }
    return item * 10;
}).collect(Collectors.toList())).get();
Savior
  • 3,225
  • 4
  • 24
  • 48