1

A little background, Im trying to use Java 8 Parallel Streams to call multiple API's in an asynchronous manner. I would like each API to be called and then block until all API's are returned. I have run into an interesting situation where if I try to stream a map, as opposed to a list, the API's are no longer being called in new threads.

If I run the following code each service is called in a new thread:

    List<GitUser> result1 = Arrays.asList(service1, service2, service3).parallelStream()
        .map(s->s.getGitUser())
        .collect(Collectors.toList());

However, if I used a map to accomplish the same task, each service is called synchronously:

    Map<String, ParallelStreamPOCService> map = new HashMap<>();
    map.put("1", service1);
    map.put("2", service2);
    map.put("3", service3);

    List<GitUser> result2 = map.entrySet().parallelStream()
            .map(s->s.getValue().getGitUser())
            .collect(Collectors.toList());

Here is the Service implementation:

    public GitUser getGitUser() {
        LOGGER.info("Loading user " + userName);
        String url = String.format("https://api.github.com/users/%s", userName);
        GitUser results = restTemplate.getForObject(url, GitUser.class);
        try {
            TimeUnit.SECONDS.sleep(secondsToSleep);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        LOGGER.error("Finished " + userName);
        return results;
    }
Andy Turner
  • 137,514
  • 11
  • 162
  • 243
Phil Ninan
  • 1,108
  • 1
  • 14
  • 23
  • 1
    Your samples are far too small to be able to draw any definitive conclusions. Try with a thousand elements and see if you observe the same thing. – Joe C Nov 29 '17 at 22:30
  • I was trying to simulate this in a smaller set by telling the thread to sleep for x seconds. If I service1 sleeps for 10 seconds and service2 for 1 second, I would expect service2 to return first. – Phil Ninan Nov 29 '17 at 22:40
  • Note: you mean "manner", not "manor". The former means "way"; the latter means "large country house with land". – Andy Turner Nov 29 '17 at 22:47
  • 1
    `I would expect service2 to return first`, you might be right here, but that would imply fairness - which parallel streams do not guarantee – Eugene Nov 30 '17 at 09:09

2 Answers2

4

As explained in this answer, this is an implementation detail about how the workload is split. HashMap has an internal backing array with a higher capacity then entries (usually). It does the splitting in terms of array elements, knowing that this might create unbalanced splits, because determining how the entries are distributed over the array can be costly.

The simplest solution is to reduce the HashSet’s capacity (the default capacity is sixteen) when you know that there will be only a few elements:

HashMap<Integer,String> map = new HashMap<>();
map.put(0, "foo");
map.put(1, "bar");
map.put(2, "baz");

map.values().parallelStream().forEach(v -> {
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(200));
    System.out.println(v+"\t"+Thread.currentThread());
});
foo Thread[main,5,main]
bar Thread[main,5,main]
baz Thread[main,5,main]
HashMap<Integer,String> map = new HashMap<>(4);
map.put(0, "foo");
map.put(1, "bar");
map.put(2, "baz");

map.values().parallelStream().forEach(v -> {
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(200));
    System.out.println(v+"\t"+Thread.currentThread());
});
foo Thread[ForkJoinPool.commonPool-worker-1,5,main]
baz Thread[main,5,main]
bar Thread[ForkJoinPool.commonPool-worker-1,5,main]

Note that it still doesn’t use one thread per element due to a rounding issue. As said, the HashMap’s Spliterator doesn’t know how the elements are distributed over the array. But it knows that there are three elements in total, so it estimates that there are half of that in each workload after splitting. The half of three is rounded to one, so the Stream implementation assumes that there is no benefit in even attempting to subdivide these workloads further.

There is no simple work-around for this, besides just using parallel streams with more elements. Still, just for education purposes:

HashMap<Integer,String> map = new HashMap<>(4, 1f);
map.put(0, "foo");
map.put(1, "bar");
map.put(2, "baz");
map.put(3, null);

map.values().parallelStream()
   .filter(Objects::nonNull)
   .forEach(v -> {
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(200));
    System.out.println(v+"\t"+Thread.currentThread());
});
bar Thread[ForkJoinPool.commonPool-worker-1,5,main]
baz Thread[main,5,main]
foo Thread[ForkJoinPool.commonPool-worker-2,5,main]

By inserting a fourth element we eliminate the rounding issue. It’s also required to provide a load factor of 1f to prevent the HashMap from increasing the capacity which would bring us back to square one (unless we have at least eight cores).

It’s a kludge, as we know in advance that we will waste a worker thread just for detecting our dummy null entry. But it demonstrates, how the workload splitting works.

Having far more elements in the map eliminates these issues automatically.


Stream are not designed for tasks that do blocking or sleeping. For these kind of tasks, you should use an ExecutorService, which would also allow to use more threads than CPU cores, which is reasonable for tasks that do not use the CPU cores the entire execution time.

ExecutorService es = Executors.newCachedThreadPool();
List<GitUser> result =
    es.invokeAll(
        Stream.of(service1, service2, service3)
              .<Callable<GitUser>>map(s -> s::getGitUser)
              .collect(Collectors.toList())
    ) .stream()
      .map(future -> {
            try { return future.get(); }
            catch (InterruptedException|ExecutionException ex) {
                throw new IllegalStateException(ex);
            }
        })
      .collect(Collectors.toList());
Holger
  • 285,553
  • 42
  • 434
  • 765
  • Can you elaborate on what you mean by "which is reasonable for tasks that do not use the CPU cores the entire execution time." Are you referring to making an HTTP request? – Phil Ninan Nov 30 '17 at 18:23
  • Making an HTTP request is one example of a task that doesn’t use the CPU the entire time, as it implies waiting for incoming data. Generally, all kind of I/O or waiting for an event or another thread falls into this category. – Holger Dec 01 '17 at 08:16
1

Note that the java doc states parallelStream() "Returns a possibly parallel Stream with this collection as its source. It is allowable for this method to return a sequential stream.". it seems like the number of elements within the stream is too small to break down into components for multiple threads to execute, hence your code is being executed sequentially rather than in parallel. Just because you're using parallelStream() doesn't necessarily mean it will be executed in parallel, it all depends on what the library determines is adequate to parallelize or not for best performances.

In addition to the aforementioned as Joe C has stated in the comments, you'll need to increase the number of elements within the stream's source substantially to actually get anywhere near seeing any effect in terms of performance.

Ousmane D.
  • 54,915
  • 8
  • 91
  • 126
  • That is a very valid point from the documentation. However, both implementations have the same number of elements. – Phil Ninan Nov 29 '17 at 22:44
  • 1
    @PhilNinan may you share how you've calculated that the first approach was being executed _asynchronously_ and the second _synchronously_? – Ousmane D. Nov 29 '17 at 22:57
  • I instantiated each service with a time in seconds to sleep: `service1 = new Service(5)` `service2 = new Service(1)` `service3 = new Service(3)` – Phil Ninan Nov 30 '17 at 17:14
  • Output of list: This was the output for the list: `2017-11-30 12:12:41,700 ERROR main com.lbisoft.core.app.components.video.service.ParallelStreamPOCService - Finished user2 2017-11-30 12:12:43,618 ERROR ForkJoinPool.commonPool-worker-2 com.lbisoft.core.app.components.video.service.ParallelStreamPOCService - Finished user3 2017-11-30 12:12:45,628 ERROR ForkJoinPool.commonPool-worker-1 com.lbisoft.core.app.components.video.service.ParallelStreamPOCService - Finished user11 – Phil Ninan Nov 30 '17 at 17:18
  • Output of map: 2017-11-30 12:12:50,713 ERROR ForkJoinPool.commonPool-worker-1 com.lbisoft.core.app.components.video.service.ParallelStreamPOCService - Finished user1 2017-11-30 12:12:51,790 ERROR ForkJoinPool.commonPool-worker-1 com.lbisoft.core.app.components.video.service.ParallelStreamPOCService - Finished user2 2017-11-30 12:12:54,947 ERROR ForkJoinPool.commonPool-worker-1 com.lbisoft.core.app.components.video.service.ParallelStreamPOCService - Finished user3 – Phil Ninan Nov 30 '17 at 17:19