The default "paralellStream()" in Java 8 uses the common ForkJoinPool
which may be a latency problem if the common Pool threads are exhausted when a task is submitted. However in many cases enough CPU power is available and the tasks are short enough so that this is not a problem. If we do have some long running tasks this will of course need some careful consideration, but for this question let's assume that this is not the problem.
However filling the ForkJoinPool
with I/O tasks that don't actually do any CPU-bound work is a way to introduce a bottleneck even though enough CPU power is available. I understood that. However that is what we have the ManagedBlocker
for. So if we have an I/O task we should simply allow the ForkJoinPool
to manage that within a ManagedBlocker
. That sounds incredibly easy. However to my surprise using a ManagedBlocker
is rather complicated API for the simple thing that it is. And after all I think that this is a common problem. So I just built a simple utility method that makes ManagedBlocker
s easy to use for the common case:
public class BlockingTasks {
public static<T> T callInManagedBlock(final Supplier<T> supplier) {
final SupplierManagedBlock<T> managedBlock = new SupplierManagedBlock<>(supplier);
try {
ForkJoinPool.managedBlock(managedBlock);
} catch (InterruptedException e) {
throw new Error(e);
}
return managedBlock.getResult();
}
private static class SupplierManagedBlock<T> implements ForkJoinPool.ManagedBlocker {
private final Supplier<T> supplier;
private T result;
private boolean done = false;
private SupplierManagedBlock(final Supplier<T> supplier) {
this.supplier = supplier;
}
@Override
public boolean block() {
result = supplier.get();
done = true;
return true;
}
@Override
public boolean isReleasable() {
return done;
}
public T getResult() {
return result;
}
}
}
Now if I want to download the html code of a couple of websites in paralell I could to it like this without the I/O causing any trouble:
public static void main(String[] args) {
final List<String> pagesHtml = Stream
.of("https://google.com", "https://stackoverflow.com", "...")
.map((url) -> BlockingTasks.callInManagedBlock(() -> download(url)))
.collect(Collectors.toList());
}
I am a little bit surprised that there is no class like the BlockingTasks
above shipped with Java (or I did not find it?), but it was not that hard to build.
When I google for "java 8 parallel stream" I get in the first four results those articles that claim that due to the I/O problem Fork/Join sucks in Java:
- https://dzone.com/articles/think-twice-using-java-8
- http://zeroturnaround.com/rebellabs/java-parallel-streams-are-bad-for-your-health/ (at least mentions
ManagedBlocker
but also says "in a different use case you’d be able to give it a ManagedBlocker instance". It does not mention why not in this case.
I have altered my search terms somewhat and while there a lot of people complaining about how horrible life is I found nobody talking about a solution like the above. Since I don't feel like Marvin (brain like a planet) and Java 8 is available for quite a while I suspect that there is something terribly wrong with what I am proposing up there.
I banged together a small test:
public static void main(String[] args) {
System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Start");
IntStream.range(0, 10).parallel().forEach((x) -> sleep());
System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": End");
}
public static void sleep() {
try {
System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Sleeping " + Thread.currentThread().getName());
Thread.sleep(10000);
} catch (InterruptedException e) {
throw new Error(e);
}
}
I ran that an got the following result:
18:41:29.021: Start
18:41:29.033: Sleeping main
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-1
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-2
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-5
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-4
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-6
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-3
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-7
18:41:39.034: Sleeping main
18:41:39.034: Sleeping ForkJoinPool.commonPool-worker-1
18:41:49.035: End
So on my 8 CPU computer the ForkJoinPool
naturally choose 8 threads, completed the first 8 tasks and finally the last two tasks which means that this took 20 seconds and if there were other tasks queued the pool could still have not used the clearly idle CPUs (except for 6 cores in the last 10 seconds).
Then I used...
IntStream.range(0, 10).parallel().forEach((x) -> callInManagedBlock(() -> { sleep(); return null; }));
...instead of...
IntStream.range(0, 10).parallel().forEach((x) -> sleep());
...and got the following result:
18:44:10.93: Start
18:44:10.945: Sleeping main
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-7
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-1
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-6
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-3
18:44:10.955: Sleeping ForkJoinPool.commonPool-worker-2
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-4
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-5
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-0
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-11
18:44:20.957: End
It looks to me like this works, extra threads were started to compensate my mock "blocking I/O action" (sleep). Time was cut down to 10 seconds and I suppose that if I'd queue more tasks that those could still use the available CPU power.
Is there anything wrong with this solution or in general using I/O in streams if the I/O operation is wrapped in a ManagedBlock
?