You can wrap the blocking operation in a ForkJoinPool.ManagedBlocker
, along the lines of this:
static <T> Supplier<T> blocking(Supplier<T> supplier) {
return new Supplier<T>() {
volatile T result;
@Override
public T get() {
try {
ForkJoinPool.managedBlock(new ManagedBlocker() {
@Override
public boolean block() {
result = supplier.get();
return true;
}
@Override
public boolean isReleasable() {
return result != null;
}
});
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
return result;
}
};
}
Use it then, for example, as such:
Stream.generate(blocking(() -> ...))
.parallel()
...
.collect(...);
More information can be found in this blog post:
http://zeroturnaround.com/rebellabs/java-parallel-streams-are-bad-for-your-health/
The jOOλ provides wrappers for all Java 8 FunctionalInterface
types, like the above, via org.jooq.lambda.Blocking
, so you can write:
Stream.generate(Blocking.supplier(() -> ...))
.parallel()
...
.collect(...);
Or, e.g. when a filter is blocking:
Stream....
.parallel()
.filter(Blocking.predicate(t -> blockingTest(t)))
.collect(...);
(Disclaimer, I work for the company behind jOOλ).