3

I'm trying to use Java 8's parallelStream() to execute several long-running requests (eg web requests) in parallel. Simplified example:

List<Supplier<Result>> myFunctions = Arrays.asList(() -> doWebRequest(), ...)

List<Result> results = myFunctions.parallelStream().map(function -> function.get()).collect(...

So if there are two functions that block for 2 and 3 seconds respectively, I'd expect to get the result after 3 seconds. However, it really takes 5 seconds - ie it seems the functions are being executed in sequence and not in parallel. Am I doing something wrong?

edit: This is an example. The time taken is ~4000 milliseconds when I want it to be ~2000.

    long start = System.currentTimeMillis();

    Map<String, Supplier<String>> input = new HashMap<String, Supplier<String>>();

    input.put("1", () -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "a";
    });

    input.put("2", () -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "b";
    });

    Map<String, String> results = input.keySet().parallelStream().collect(Collectors.toConcurrentMap(
            key -> key,
            key -> {
                return input.get(key).get();
            }));

    System.out.println("Time: " + (System.currentTimeMillis() - start));

}

Doesn't make any difference if I iterate over the entrySet() instead of the keySet()

edit: changing the parallel part to the following also does not help:

 Map<String, String> results = input.entrySet().parallelStream().map(entry -> {
            return new ImmutablePair<String, String>(entry.getKey(), entry.getValue().get());
    }).collect(Collectors.toConcurrentMap(Pair::getLeft, Pair::getRight));
Ryan
  • 919
  • 11
  • 19
  • 1
    What does the javadoc of `parallelStream()` say? – Sotirios Delimanolis Aug 27 '14 at 01:46
  • I've looked and I don't see anything obviously relevant. – Ryan Aug 27 '14 at 01:50
  • Are u running your code on multi-core hardware ? – Juned Ahsan Aug 27 '14 at 01:51
  • Yes, it's a quad core. – Ryan Aug 27 '14 at 01:52
  • You cannot assume on the basis of timing whether the execution was sequential or parallel. You can always use isParallel() api to check how your stream is treated by jvm. – Juned Ahsan Aug 27 '14 at 01:55
  • 1
    If it's consistently taking exactly the sum of the time of the two lambdas, it's either sequential or not usefully parallel. This all works fine using an Executor service in the obvious way which is what I'll probably end up using unless I can figure this out. But so far I'm not impressed by the usability compared to either older Java parallel systems or Scala. – Ryan Aug 27 '14 at 01:57
  • Please give a reproducible example. Trying the code you have with something that simply `Thread.sleep`s works fine. – Sotirios Delimanolis Aug 27 '14 at 02:09
  • I've attached an example that's closer to my actual code. – Ryan Aug 27 '14 at 02:26
  • This actually has to do with how the internal Spliterator splits over the HashMap's entries. If you substitute `input.keySet().parallelStream()` with `new ArrayList<>(input.keySet()).parallelStream()`, it'll take full advantage of the host parallelism. – tclamb Nov 03 '16 at 21:00

1 Answers1

10

When executing in parallel, there is overhead of decomposing the input set, creating tasks to represent the different portions of the calculation, distributing the actions across threads, waiting for results, combining results, etc. This is over and above the work of actually solving the problem. If a parallel framework were to always decompose problems down to a granularity of one element, for most problems, these overheads would overwhelm the actual computation and parallelism would result in a slower execution. So parallel frameworks have some latitude to decide how finely to decompose the input, and that's what's happening here.

In your case, your input set is simply too small to be decomposed. So the library chooses to execute sequentially.

Try this on your four-core system: compare

IntStream.range(0, 100_000).sum()

vs

IntStream.range(0, 100_000).parallel().sum()

Here, you're giving it enough input that it will be confident it can win through parallel execution. If you measure with a responsible measurement methodology (say, the JMH microbenchmark harness), you'll probably see an almost-linear speedup between these two examples.

Brian Goetz
  • 90,105
  • 23
  • 150
  • 161
  • Is there any way to force it to actually run in parallel, given that I know that I want to in this case? If not I feel like this library is not going to be reliable enough to use much. – Ryan Aug 27 '14 at 02:36
  • 12
    Based on the little information you've given about your actual problem, I think you've misunderstood what the Streams library is actually for. Streams is about *data-parallelism*; data parallel problems are CPU-bound, not IO-bound. It seems that you're simply looking to run a number of mostly unrelated IO-intensive tasks concurrently. Use a plain-old thread pool for that; your first example is an ideal candidate for `ExecutorService.invokeAll()`. – Brian Goetz Aug 27 '14 at 04:48
  • I did end up using ExecutorService. It would be nice if Streams could handle both cases (and I don't see why it shouldn't) since its API is nicer in some ways. – Ryan Aug 27 '14 at 06:52
  • 2
    I’m not sure whether `IntStream.range(0, 100_000).sum()` is a good example. Summing up a constant range can be optimized to a no-op for the sequential execution. And even if that doesn’t happen, summing up `100000` values is way to little work to benefit from parallel execution. – Holger Aug 27 '14 at 09:28
  • 1
    @Holger +1 for the comment and, to add a specific example, `Arrays.stream(intArray).parallel().map(e -> e * 5 ).sum();` for an array of 2^20 random integers, the speedup factor is way below the degree of parallelism (about half of it). – Marko Topolnik Aug 27 '14 at 11:26