By default Java streams are processed by a common thread pool, which is constructed with default parameters. As has been answered in another question, one can adjust these defaults by specifying a custom pool or by setting the java.util.concurrent.ForkJoinPool.common.parallelism
system parameter.
However, I've been unable to increase the number of threads allocated to stream processing by any of these two methods. As an example, consider the program below, which processes a list of IP addresses contained in a file specified in its first argument and outputs the resolved addresses. Running this on a file with about 13000 unique IP addresses, I see using Oracle Java Mission Control as few as 16 threads. Of these, only five are ForkJoinPool
workers. Yet, this particular task would well benefit from many more threads, because threads spend most of the time waiting for DNS responses. So my question is, how can I actually increase the number of threads used?
I've tried the program on three environments; these are the OS-reported number of threads.
- Java SE Runtime Environment build 1.8.0_73-b02 on an 8-core machine running Windows 7: 17 threads
- Java SE Runtime Environment build 1.8.0_66-b17 on a 2-core machine running OS X Darwin 15.2.0: 23 threads
- openjdk version 1.8.0_72 on a 24-core machine running FreeBSD 11.0: 44 threads
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ForkJoinPool;
/** Resolve IP addresses in file args[0] using 100 threads */
public class Resolve100 {
/** Resolve the passed IP address into a name */
static String addressName(String ipAddress) {
try {
return InetAddress.getByName(ipAddress).getHostName();
} catch (UnknownHostException e) {
return ipAddress;
}
}
public static void main(String[] args) {
Path path = Paths.get(args[0]);
ForkJoinPool fjp = new ForkJoinPool(100);
try {
fjp.submit(() -> {
try {
Files.lines(path)
.parallel()
.map(line -> addressName(line))
.forEach(System.out::println);
} catch (IOException e) {
System.err.println("Failed: " + e);
}
}).get();
} catch (Exception e) {
System.err.println("Failed: " + e);
}
}
}