As explained in this answer, this is an implementation detail about how the workload is split. HashMap
has an internal backing array with a higher capacity then entries (usually). It does the splitting in terms of array elements, knowing that this might create unbalanced splits, because determining how the entries are distributed over the array can be costly.
The simplest solution is to reduce the HashSet
’s capacity (the default capacity is sixteen) when you know that there will be only a few elements:
HashMap<Integer,String> map = new HashMap<>();
map.put(0, "foo");
map.put(1, "bar");
map.put(2, "baz");
map.values().parallelStream().forEach(v -> {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(200));
System.out.println(v+"\t"+Thread.currentThread());
});
foo Thread[main,5,main]
bar Thread[main,5,main]
baz Thread[main,5,main]
HashMap<Integer,String> map = new HashMap<>(4);
map.put(0, "foo");
map.put(1, "bar");
map.put(2, "baz");
map.values().parallelStream().forEach(v -> {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(200));
System.out.println(v+"\t"+Thread.currentThread());
});
foo Thread[ForkJoinPool.commonPool-worker-1,5,main]
baz Thread[main,5,main]
bar Thread[ForkJoinPool.commonPool-worker-1,5,main]
Note that it still doesn’t use one thread per element due to a rounding issue. As said, the HashMap
’s Spliterator
doesn’t know how the elements are distributed over the array. But it knows that there are three elements in total, so it estimates that there are half of that in each workload after splitting. The half of three is rounded to one, so the Stream
implementation assumes that there is no benefit in even attempting to subdivide these workloads further.
There is no simple work-around for this, besides just using parallel streams with more elements. Still, just for education purposes:
HashMap<Integer,String> map = new HashMap<>(4, 1f);
map.put(0, "foo");
map.put(1, "bar");
map.put(2, "baz");
map.put(3, null);
map.values().parallelStream()
.filter(Objects::nonNull)
.forEach(v -> {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(200));
System.out.println(v+"\t"+Thread.currentThread());
});
bar Thread[ForkJoinPool.commonPool-worker-1,5,main]
baz Thread[main,5,main]
foo Thread[ForkJoinPool.commonPool-worker-2,5,main]
By inserting a fourth element we eliminate the rounding issue. It’s also required to provide a load factor of 1f
to prevent the HashMap
from increasing the capacity which would bring us back to square one (unless we have at least eight cores).
It’s a kludge, as we know in advance that we will waste a worker thread just for detecting our dummy null
entry. But it demonstrates, how the workload splitting works.
Having far more elements in the map eliminates these issues automatically.
Stream are not designed for tasks that do blocking or sleeping. For these kind of tasks, you should use an ExecutorService
, which would also allow to use more threads than CPU cores, which is reasonable for tasks that do not use the CPU cores the entire execution time.
ExecutorService es = Executors.newCachedThreadPool();
List<GitUser> result =
es.invokeAll(
Stream.of(service1, service2, service3)
.<Callable<GitUser>>map(s -> s::getGitUser)
.collect(Collectors.toList())
) .stream()
.map(future -> {
try { return future.get(); }
catch (InterruptedException|ExecutionException ex) {
throw new IllegalStateException(ex);
}
})
.collect(Collectors.toList());