4

I have a list of objects that I want to process and Java8 stream API looks the most clean and readable way to do it.

But some operations I need to do on these objects include blocking IO (like reading a database) – so I’d like to submit those operations to a threadpool with a couple dozens threads.

At first I thought about doing something along the lines of:

myObjectList
    .stream()
    .filter(wrapPredicate(obj -> threadPoolExecutor.submit(
            () -> longQuery(obj)          // returns boolean
    ).get())                              // wait for future & unwrap boolean
    .map(filtered -> threadPoolExecutor.submit(
            () -> anotherQuery(filtered)  // returns Optional
    ))
    .map(wrapFunction(Future::get))
    .filter(Optional::isPresent)
    .map(Optional::get)
    .collect(toList());

The wrapPredicate and wrapFunction are just for checked exception rethrowing.

But, obviously, calls to Future.get() will block the stream’s thread until the query finishes for a given object and the stream won’t progress until then. So only one object is processed at a time and the threadpool doesn’t make sense.

I could use a parallel stream, but then I’d need to hope that the default ForkJoinPool is enough for this. Or just increase "java.util.concurrent.ForkJoinPool.common.parallelism", but I do not want to change the whole application’s settings for the sake of that stream. I could create the stream in a custom ForkJoinPool, but I see it does not guarantee that level of parallelism.

So I ended up with something like that, just to guarantee that all needed tasks are submitted to the threadPool before waiting for futures to be finished:

myObjectList
    .stream()
    .map(obj -> Pair.of(obj, threadPoolExecutor.submit(
                    () -> longQuery(obj)             // returns boolean
        ))
    )
    .collect(toList()).stream()                      // terminate stream to actually submit tasks to the pool
    .filter(wrapPredicate(p -> p.getRight().get()))  // wait & unwrap future after all tasks are submitted
    .map(Pair::getLeft)
    .map(filtered -> threadPoolExecutor.submit(
            () -> anotherQuery(filtered)             // returns Optional
    ))
    .collect(toList()).stream()                      // terminate stream to actually submit tasks to the pool
    .map(wrapFunction(Future::get))                  // wait & unwrap futures after all submitted
    .filter(Optional::isPresent)
    .map(Optional::get)
    .collect(toList());

Is there any obviously better way to achieve this?

A more elegant way to tell the stream “execute the pipelined steps until now for every object in the stream now” and then keep processing other than .collect(toList()).stream() and a better way to filter on the effect of a Future than packing it into Apache Commons Pair to filter on Pair::getRight later? Or perhaps a totally different approach to the problem?

Community
  • 1
  • 1
silmeth
  • 680
  • 1
  • 8
  • 22
  • 5
    Maybe its just me more-oldschool-than-stream-person; but I don't find the outcome of this exercise to be so delightful. Kinda hard to read; and getting all the subtle details would make turn "lets hope i have to never ever touch and modify this code". Because I would be too afraid to break it. – GhostCat Apr 24 '17 at 15:23
  • IMO it is pretty readable when there are no `.collect(toList()).stream()` in the middle of the method chain – but then it does not quite do what I want it to. I don’t think it’d be more readable and maintainable if done with classic `for` loops here (but maybe? I’ll try and ask colleagues what they think). – silmeth Apr 24 '17 at 15:29
  • Well, for loops and other methods probably. – GhostCat Apr 24 '17 at 15:30
  • Possible duplicate of [Custom thread pool in Java 8 parallel stream](http://stackoverflow.com/questions/21163108/custom-thread-pool-in-java-8-parallel-stream) – Dawid Kunert Apr 24 '17 at 15:31
  • Just to check that I understood everything... You first submit `longQuery` and need to wait for a boolean result, and then this boolean result is the parameter of `anotherQuery`, which returns an `Optional`, then you block again and collect the results to a list. Is this correct? – fps Apr 24 '17 at 16:04
  • 1
    Almost, but not really. I submit `longQuery` to get a boolean on which I `filter` the original objects. Then I submit `anotherQuery` with the original objects, but only those for which `longQuery` returned `true`. The rest is correct – I then block to wait for the `Optional`s which unpacked I collect again to a List. – silmeth Apr 24 '17 at 16:07
  • 3
    OK, got it. I wouldn't use the boolean result to filter elements from the stream. Instead, I would call `anotherQuery` if the result is true, and I would do nothing if false. I also would never call `Future.get` inside the stream. I would just collect the futures to a list and use either Guava or `CompletableFuture.allOf` to merge the i.e. `List>>` into a `CompletableFuture>>`, and I would finally wait only on that only one future. – fps Apr 24 '17 at 16:14
  • That be a really good solution if only `CompletableFuture.allOf()` actually returned `CompletableFuture>` or `CompletableFuture>`. Unfortunately it returns `CompletableFuture` and does not give access to the results of underlying futures. But, anyway, thanks, using `CompletableFuture`s instead of old `java.util.concurrent.Future` is certainly an improvement anyway. – silmeth Apr 25 '17 at 10:03
  • OK, I doing it Federico’s way – list of `CompletableFuture`, composing with next query if `true` to get `CompletableFuture>`, and then doing `CompletableFuture>` out of it – ended up requiring a bit more boilerplate due to `CompletableFuture` API, but is much more readable and elegant in the end. Thanks. – silmeth Apr 25 '17 at 12:04

4 Answers4

2

You can greatly simplify your code by using

myObjectList.stream()
    .map(obj -> threadPoolExecutor.submit(
                    () -> longQuery(obj)? anotherQuery(obj).orElse(null): null))
    .collect(toList()).stream()
    .map(wrapFunction(Future::get))
    .filter(Objects::nonNull)
    .collect(toList());

One point is that there will be no improvement in concurrency, if you submit anotherQuery at a later time to the same executor. Therefore, you can execute it directly after longQuery returned true. At this point, obj is still in scope so you may use it for anotherQuery.

By extracting the result of the Optional, using null as representation of a failure, we can get the same representation of absent results, be it because longQuery returned false or anotherQuery returned an empty Optional. So all we have to do after extracting the Future’s result, is to .filter(Objects::nonNull).

The logic that you have to submit the jobs first, collecting the Futures, before getting the actual results does not change. There is no way around it anyway. All that other convenience methods or frameworks can offer, is to hide the actual temporary storage of these objects.

Holger
  • 285,553
  • 42
  • 434
  • 765
1

I think the main question's answer is no. In order to "execute" a stream, you need a terminal operation. But there might be room for improvement.

You can at least get rid of the pair by collecting to a map instead of a list:

stream.collect(toMap(Function.identity(),
                     obj -> threadPoolExecutor.submit(() -> longQuery(obj))))
      .entrySet()
      .stream()
      .filter(wrapPredicate(entry -> entry.getValue().get()))
      .map(Entry::getKey)
      ...

Note that this only works if none of the processed objects are equal to another. It makes the code slightly shorter and easier to read as you don't have to create the pair/entry yourself.

Malte Hartwig
  • 4,477
  • 2
  • 14
  • 30
  • They are (or at least should be) unique, so collecting to a map is a very good suggestion, I didn’t think about it. Thanks. :) – silmeth Apr 25 '17 at 09:26
0

You can specify thread-pool for Java 8 parallel streams. You dont have to change application settings. More info: https://stackoverflow.com/a/22269778/7123191.

Community
  • 1
  • 1
Dawid Kunert
  • 183
  • 2
  • 11
  • 3
    As I explained in the description – running a parallel stream in a custom big `ForkJoinPool` does not guarantee desired level of parallelization, so it is not a solution to this problem. It’d be perfect if I could do just that, but the parallel stream may still run in fewer batches than the number of threads I specified. Compare http://stackoverflow.com/a/29272776/359949. – silmeth Apr 24 '17 at 15:40
0

You can at least get rid of the pair by collecting to a map instead of a list:

stream.collect(toMap(Function.identity(),
                     obj -> threadPoolExecutor.submit(() -> longQuery(obj))))
      .entrySet()
      .stream()
      .filter(wrapPredicate(entry -> entry.getValue().get()))
      .map(Entry::getKey)
      ...
moni123
  • 165
  • 1
  • 4
  • 13
  • That looks identical to [this two day old answer](https://stackoverflow.com/a/43604097/2711488). – Holger Apr 27 '17 at 12:13