2

I can't find a specific answer to the line of investigation that we've been requested to take on

I see that parallel streams may not be so performant when using small amount of threads, and that apparently it doesn't behave so well when the DB blocks the next request while processing the current one

However, I find that the overhead of implementing Task Executor vs Parallel Streams is huge, we've implemented a POC that takes care of our concurrency needs with just this one line of code:

List<Map<String, String>> listWithAllMaps = mappedValues.entrySet().parallelStream().map(e -> callPlugins(e))
        .collect(Collectors.toList());

Whereas in Task Executor, we'd need to override the Runnable interface and write some cumbersome code just to get the runnables not to be void and return the values we're reading from the DB, leading us into several hours, if not days of coding, and producing a less maintainable, more bug prone code

However, our CTO is still reluctant to using parallel streams due to unforeseen issues that could come up down the road

So the question is, in an environment where I need to make several concurrent read-only queries to a database, using different java-components/REST calls for each query: Is it preferrable in any way to use Task Executor instead of parallel streaming, if so, why?

Steven
  • 1,236
  • 1
  • 13
  • 37
  • I'm afraid I don't have time to look up latest implementations or anything like that. But when they _first_ came around (maybe still) parallel streams all pulled from the _same_, system level thread pool. This created a lot of contention for anything which chose to use them (if more than one place in the code), and so manually using a specifically configured executor was less disruptive to other parts of the code base. – BeUndead Jul 29 '19 at 10:34
  • Parallel streams use same ForkJoinPool by default, so keep in mind that if you use parallel streams in different places in your app, those streams might fight for threads from ForkJoinPool. – Michał Krzywański Jul 29 '19 at 10:35
  • Why would you need cumbersome code for that? Use the `AsyncTaskExecutor` instead of `TaskExecutor` and use a `Callable` (which is non-void by default!). Or just use the `TaskExecutor` as an `Executor` (it extends that) and use that with `CompletableFuture` . – M. Deinum Jul 29 '19 at 10:40
  • @user2478398, michalk I think that can be taken care off like this: https://stackoverflow.com/questions/21163108/custom-thread-pool-in-java-8-parallel-stream – Steven Jul 29 '19 at 10:41
  • @Steven: The suggestion on that uses code equally, if not more cumbersome than just implementing `Runnable` (IMO), where it is submitting to its own `ForkJoinPool`. Maybe that's just me though. – BeUndead Jul 29 '19 at 10:43
  • @user2478398 hmmm, maybe we're just not getting the Runnable implementation right then, for us, having to return values from several requests, each in one thread and then needing to collect them in a single List of Maps was not quite straight forward – Steven Jul 29 '19 at 10:47
  • @M.Deinum, we found some complication with doing the get() method at the end of the completable future. In our scenario, where we might need to call up to 20 different plugins expecting a result in the same format from them all and then collecting them in a List, we couldn't apply the get in a loop without blocking the threads that aremay still be going on – Steven Jul 29 '19 at 10:50
  • You shouldn't be using `get` as that makes it blocking. Instead use `CompletableFuture.allOf().join()` which waits for all the results, then iterate of all the `CompletableFuture`s to agregate the results. Instead of just throwing things together first try to learn the things you want to throw together. – M. Deinum Jul 29 '19 at 10:51
  • @M.Deinum hmmm, right thanks for pointing that out, I will test it straight away, however, that's exactly the kind of "cumbersome" code I was on about, as parallel streams abstracts you from these details – Steven Jul 29 '19 at 10:53
  • The `parallelStream` uses the `ForkJoinPool` when used with the wrong kind of instructions this can lead to all sorts of issues. – M. Deinum Jul 29 '19 at 10:54
  • Cool, if you have examples in mind that would lead to these errors and the time to put them in an answer, I could then accept Task Executor as preferrable in this case scenario and maybe help others in the same situation – Steven Jul 29 '19 at 10:57
  • 1
    The current one shows some code that makes the `Executor` path look "similar" to that of streams, but I also wonder about possible guidelines or rules of thumb (or maybe even *hard* rules?) of when which approach is preferable over the other. So +1 for an interesting question, even though I'm not sure what a "good" answer could look like, and it probably has some "opinion-based" elements... – Marco13 Jul 29 '19 at 12:44
  • @Marco13 When I'd just posted it, I was sure it wouldn't last long due to opinion-based or ambiguous down-votes, but to my surprise, it's attracted some highly valued StackOverflow members like yourself. And yes, I would love to find some hard rules for this, as it's frustrating to be able to code something in one single line of code, but left with uncertainty as for how it might compromise the data integrity, let's hope some more gurus have a nose around here :) – Steven Jul 30 '19 at 06:17

1 Answers1

3

Use the TaskExecutor as an Executor for a CompletableFuture.

List<CompletableFuture> futures = mappedValues.entrySet().stream().map(e - > CompletableFuture.supplyAsync(() -> callPlugins(e), taskExecutor)).collect(Collectors.toList());

List<Map<String, String>> listWithAllMaps = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());

Not sure how this is cumbersome. Yes it is a bit more code, but with the advantage that you can easily configure the TaskExecutor and increase the number of threads, queueu-size etc. etc.

DISCLAIMER: Typed it from the top of my head, so some minor things might be of with the code snippet.

M. Deinum
  • 115,695
  • 22
  • 220
  • 224
  • 2
    You are using `map` to run something with a side effect. That's bad practice. – RealSkeptic Jul 29 '19 at 11:27
  • Depends on how you look at it imho. You create `CompletableFuture` instances, so you are mapping from one type to another. Rewriting it to a for each loop is also possible, but diminishes the readable imho. – M. Deinum Jul 29 '19 at 11:57
  • No, you're not just mapping into completable futures. You actually changing the state of the taskExecutor. By the way, there is no `submitAsync` method in `CompletableFuture` - I assume you meant `supplyAsync`. – RealSkeptic Jul 29 '19 at 13:22
  • While this is a great answer and has really pointed us in the right direction for coding a POC with Task Executor, it doesn't give us clues as for why we should choose one approach over another. I'd love to see examples/case scenarios of when my line of code in original question would compromise data integrity but not so with the implementation above. Also, to implement this, we've had to create a new class implementing Runnable, override the run method and add an extra line of code to map the results, it still doesn't work for us, after about 5 hours of coding vs 30 min for the streams – Steven Jul 30 '19 at 06:14
  • I don't see why you need runnable? You still hold to that claim byt I don't see why on earth you would need that. The bottom line is you don't want to use the `ForkJoinPool` for this (that is a limited thread pool for short lived tasks, yours are long lived and with that can hangup other parts of the JVM using the `ForkJoinPool`.). – M. Deinum Jul 30 '19 at 06:18
  • In here: `CompletableFuture.supplyAsync(() -> callPlugins(e), taskExecutor))` unless callPlugins is a Runnable method, it won't compile – Steven Jul 30 '19 at 06:22
  • Ok, forget my last comment, I've just removed the Runnable and it works, brain meltdown – Steven Jul 30 '19 at 06:39
  • `supplyAsync` requires a `Supplier` and `callPlugins` takes an argument and returns something (at least it should) then it complies and you don't need `Runnable`. – M. Deinum Jul 30 '19 at 06:39