I cannot achieve good parallelization of stream processing when the stream source is a Reader
. Running the code below on a quad-core CPU I observe 3 cores being used at first, then a sudden drop to just two, then one core. Overall CPU utilization hovers around 50%.
Note the following characteristics of the example:
- there are just 6,000 lines;
- each line takes about 20 ms to process;
- the whole procedure takes about a minute.
That means that all the pressure is on the CPU and I/O is minimal. The example is a sitting duck for automatic parallelization.
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
... class imports elided ...
public class Main
{
static final AtomicLong totalTime = new AtomicLong();
public static void main(String[] args) throws IOException {
final long start = System.nanoTime();
final Path inputPath = createInput();
System.out.println("Start processing");
try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(Paths.get("output.txt")))) {
Files.lines(inputPath).parallel().map(Main::processLine)
.forEach(w::println);
}
final double cpuTime = totalTime.get(),
realTime = System.nanoTime()-start;
final int cores = Runtime.getRuntime().availableProcessors();
System.out.println(" Cores: " + cores);
System.out.format(" CPU time: %.2f s\n", cpuTime/SECONDS.toNanos(1));
System.out.format(" Real time: %.2f s\n", realTime/SECONDS.toNanos(1));
System.out.format("CPU utilization: %.2f%%", 100.0*cpuTime/realTime/cores);
}
private static String processLine(String line) {
final long localStart = System.nanoTime();
double ret = 0;
for (int i = 0; i < line.length(); i++)
for (int j = 0; j < line.length(); j++)
ret += Math.pow(line.charAt(i), line.charAt(j)/32.0);
final long took = System.nanoTime()-localStart;
totalTime.getAndAdd(took);
return NANOSECONDS.toMillis(took) + " " + ret;
}
private static Path createInput() throws IOException {
final Path inputPath = Paths.get("input.txt");
try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(inputPath))) {
for (int i = 0; i < 6_000; i++) {
final String text = String.valueOf(System.nanoTime());
for (int j = 0; j < 25; j++) w.print(text);
w.println();
}
}
return inputPath;
}
}
My typical output:
Cores: 4
CPU time: 110.23 s
Real time: 53.60 s
CPU utilization: 51.41%
For comparison, if I use a slightly modified variant where I first collect into a list and then process:
Files.lines(inputPath).collect(toList()).parallelStream().map(Main::processLine)
.forEach(w::println);
I get this typical output:
Cores: 4
CPU time: 138.43 s
Real time: 35.00 s
CPU utilization: 98.87%
What could explain that effect, and how can I work around it to get full utilization?
Note that I have originally observed this on a reader of servlet input stream so it's not specific to a FileReader
.