I have a list of objects that I want to process and Java8 stream API looks the most clean and readable way to do it.
But some operations I need to do on these objects include blocking IO (like reading a database) – so I’d like to submit those operations to a threadpool with a couple dozens threads.
At first I thought about doing something along the lines of:
myObjectList
.stream()
.filter(wrapPredicate(obj -> threadPoolExecutor.submit(
() -> longQuery(obj) // returns boolean
).get()) // wait for future & unwrap boolean
.map(filtered -> threadPoolExecutor.submit(
() -> anotherQuery(filtered) // returns Optional
))
.map(wrapFunction(Future::get))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(toList());
The wrapPredicate
and wrapFunction
are just for checked exception rethrowing.
But, obviously, calls to Future.get()
will block the stream’s thread until the query finishes for a given object and the stream won’t progress until then. So only one object is processed at a time and the threadpool doesn’t make sense.
I could use a parallel stream, but then I’d need to hope that the default ForkJoinPool
is enough for this. Or just increase "java.util.concurrent.ForkJoinPool.common.parallelism"
, but I do not want to change the whole application’s settings for the sake of that stream. I could create the stream in a custom ForkJoinPool
, but I see it does not guarantee that level of parallelism.
So I ended up with something like that, just to guarantee that all needed tasks are submitted to the threadPool before waiting for futures to be finished:
myObjectList
.stream()
.map(obj -> Pair.of(obj, threadPoolExecutor.submit(
() -> longQuery(obj) // returns boolean
))
)
.collect(toList()).stream() // terminate stream to actually submit tasks to the pool
.filter(wrapPredicate(p -> p.getRight().get())) // wait & unwrap future after all tasks are submitted
.map(Pair::getLeft)
.map(filtered -> threadPoolExecutor.submit(
() -> anotherQuery(filtered) // returns Optional
))
.collect(toList()).stream() // terminate stream to actually submit tasks to the pool
.map(wrapFunction(Future::get)) // wait & unwrap futures after all submitted
.filter(Optional::isPresent)
.map(Optional::get)
.collect(toList());
Is there any obviously better way to achieve this?
A more elegant way to tell the stream “execute the pipelined steps until now for every object in the stream now” and then keep processing other than .collect(toList()).stream()
and a better way to filter on the effect of a Future
than packing it into Apache Commons Pair
to filter on Pair::getRight
later? Or perhaps a totally different approach to the problem?
>>`, and I would finally wait only on that only one future.
>` or `CompletableFuture>`. Unfortunately it returns `CompletableFuture` and does not give access to the results of underlying futures. But, anyway, thanks, using `CompletableFuture`s instead of old `java.util.concurrent.Future` is certainly an improvement anyway.
– silmeth Apr 25 '17 at 10:03>` out of it – ended up requiring a bit more boilerplate due to `CompletableFuture` API, but is much more readable and elegant in the end. Thanks.