7

When I run the following code, only 2 out of 8 threads that available run, can anyone explain why is it the case? how can I change the code in such a way that it will take advantage of all 8 threads?

Tree.java:

package il.co.roy;

import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

public class Tree<T>
{
    private final T data;
    private final Set<Tree<T>> subTrees;

    public Tree(T data, Set<Tree<T>> subTrees)
    {
        this.data = data;
        this.subTrees = subTrees;
    }

    public Tree(T data)
    {
        this(data, new HashSet<>());
    }

    public Tree()
    {
        this(null);
    }

    public T getData()
    {
        return data;
    }

    public Set<Tree<T>> getSubTrees()
    {
        return subTrees;
    }

    @Override
    public boolean equals(Object o)
    {
        if (this == o)
            return true;
        if (o == null || getClass() != o.getClass())
            return false;
        Tree<?> tree = (Tree<?>) o;
        return Objects.equals(data, tree.data) &&
                Objects.equals(subTrees, tree.subTrees);
    }

    @Override
    public int hashCode()
    {
        return Objects.hash(data, subTrees);
    }

    @Override
    public String toString()
    {
        return "Tree{" +
                "data=" + data +
                ", subTrees=" + subTrees +
                '}';
    }

    public void sendCommandAll()
    {
        if (data != null)
            System.out.println("[" + Thread.currentThread().getName() + "] sending command to " + data);
        try
        {
            Thread.sleep(5000);
        } catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        if (data != null)
            System.out.println("[" + Thread.currentThread().getName() + "] tree with data " + data + " got " + true);
        subTrees.parallelStream()
//              .map(Tree::sendCommandAll)
                .forEach(Tree::sendCommandAll);
//              .reduce(true, (aBoolean, aBoolean2) -> aBoolean && aBoolean2);
    }
}

(It doesn't matter if I use forEach or reduce).

Main.java:

package il.co.roy;

import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class Main
{
    public static void main(String... args)
    {
        System.out.println("Processors: " + Runtime.getRuntime().availableProcessors());


        final Tree<Integer> root = new Tree<>(null,
                Set.of(new Tree<>(1,
                        IntStream.range(2, 7)
                                        .boxed()
                                        .map(Tree::new)
                                        .collect(Collectors.toSet()))));

        root.sendCommandAll();

//      IntStream.generate(() -> 1)
//              .parallel()
//              .forEach(i ->
//              {
//                  System.out.println(Thread.currentThread().getName());
//                  try
//                  {
//                      Thread.sleep(5000);
//                  } catch (InterruptedException e)
//                  {
//                      e.printStackTrace();
//                  }
//              });
    }
}

In the main method I create a tree with the following structure:\

root (data is `null`)
  |- 1
     |- 2
     |- 3
     |- 4
     |- 5
     |- 6

sendCommandAll function process every sub-tree (in parallel) only if it's parent finishes to be processed. but the result is as follows:

Processors: 8
[main] sending command to 1
[main] tree with data 1 got true
[main] sending command to 6
[ForkJoinPool.commonPool-worker-2] sending command to 5
[main] tree with data 6 got true
[ForkJoinPool.commonPool-worker-2] tree with data 5 got true
[ForkJoinPool.commonPool-worker-2] sending command to 4
[ForkJoinPool.commonPool-worker-2] tree with data 4 got true
[ForkJoinPool.commonPool-worker-2] sending command to 3
[ForkJoinPool.commonPool-worker-2] tree with data 3 got true
[ForkJoinPool.commonPool-worker-2] sending command to 2
[ForkJoinPool.commonPool-worker-2] tree with data 2 got true

(For the record, when I execute the commented code in Main.java, the JVM uses all 7 (+ 1) threads available commonPool)

How can I improve my code?

Roy Ash
  • 1,078
  • 1
  • 11
  • 28
  • Check this question: [Parallel stream from a hashset doesn't run in parallel](https://stackoverflow.com/questions/28985704/parallel-stream-from-a-hashset-doesnt-run-in-parallel) – Alex Sveshnikov Oct 13 '21 at 12:32
  • I don't think I encountered the same problem: first of all I use JDK 17, even when I use custom `ForkJoinPool`, with parallelism of 20, only 2 threads are active – Roy Ash Oct 13 '21 at 13:33
  • As the answer says - it's not guaranteed to work. Also, if I rewrite your code to use List instead of Set I see more threads used from the pool. – Alex Sveshnikov Oct 13 '21 at 14:05
  • 3
    As explained in the second half of [this answer](https://stackoverflow.com/a/44802784/2711488), `HashMap`s (and in turn `HashSet`s) with a small number of elements, compared to their (default) capacity may distribute their work badly, depending on the hash code distribution. You can work around this using `new ArrayList<>(subTrees).parallelStream()` but there are other flaws in your approach, like doing the work/wait before even starting to traverse the children. You should separate the iteration logic from the actual action. – Holger Oct 13 '21 at 14:10
  • Thank you @Holger, it indeed solved my problem, can you rewrite your comment as official answer for getting the credit and the points :-) – Roy Ash Oct 13 '21 at 14:55

1 Answers1

4

As explained in the second half of this answer, the thread utilization when processing HashMaps or HashSets depends on the distribution of the elements within the backing array, which depends on the hashcodes. Especially with with a small number of elements, compared to the (default) capacity, this may result in bad work splitting.

A simple work-around is using new ArrayList<>(subTrees).parallelStream() instead of subTrees.parallelStream().

But note that your method performs the actual work of the current node (in the example simulated with a sleep) before processing the children which also reduces the potential parallelism.

You may use

public void sendCommandAll() {
    if(subTrees.isEmpty()) {
        actualSendCommand();
        return;
    }
    List<Tree<T>> tmp = new ArrayList<>(subTrees.size() + 1);
    tmp.addAll(subTrees);
    tmp.add(this);
    tmp.parallelStream().forEach(t -> {
        if(t != this) t.sendCommandAll(); else t.actualSendCommand();
    });
}

private void actualSendCommand() {
    if (data != null)
        System.out.println("[" + Thread.currentThread().getName()
                         + "] sending command to " + data);
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    if (data != null)
        System.out.println("[" + Thread.currentThread().getName()
                         + "] tree with data " + data + " got " + true);
}

This allows to process the current node concurrently to the processing of the children.

Holger
  • 285,553
  • 42
  • 434
  • 765
  • I know the code isn’t perfect, it was created only for the demonstration of this wired bug, and the tree meant to enforce some limitations of the parallelism. I.e. continue to the sub-tree only when the parent finished it’s work – Roy Ash Oct 13 '21 at 15:44
  • 2
    Then, the simple solution `new ArrayList<>(subTrees).parallelStream()` will do. I’ll keep the other solution for future readers who might not have this constraint… – Holger Oct 13 '21 at 15:48
  • can you explain why HashMap (and HashSet) have this limitation in the first place? Is TreeMap (and TreeSet) also have this problem? Is there a way to solve the problem without passing the collection only to populate List? Why is it related to hash code distribution in the first place? – Roy Ash Oct 13 '21 at 15:52
  • 3
    Did you read the linked answer? The challenge is to split the work without spending too much time in analyzing the situation. I debugged your case and the elements were indeed all clustered in one region of the array, so splitting the array into equal sized ranges leads to several all-empty ranges. The `TreeMap` shouldn’t have this issue as the nodes are already (almost) balanced, which allows recursively passing one half to another worker thread, however, it’s not perfectly balanced which may still lead to lesser parallelism for small sets. – Holger Oct 13 '21 at 16:15
  • 2
    @RoyAsh It's not a bug, and it's not specific to hash-based collections. Parallel processing is a tradeoff; splitting and joining has costs, which we hope to overcome by throwing more CPUs at the problem. In general, splitting down to one element is not optimal (and, using parallelism on small data sets is not optimal either.) The splitting heuristics are tuned for effective parallelism on large data sets with largely CPU-bound computations. – Brian Goetz Oct 15 '21 at 18:34
  • @BrianGoetz so how do you explain the different between hash code based collections to linear base collections like ArrayList (I know it’s not a bug, Holger made sense in his explanation)? – Roy Ash Oct 15 '21 at 18:37
  • 3
    @RoyAsh Try LinkedList, it's even worse. It's a complicated function of the splitting heuristics and the collection topology and the quality of the Spliterator implementation. – Brian Goetz Oct 15 '21 at 19:08
  • 1
    @BrianGoetz in fact, it does split down until the estimated size of each chunk of work is one. The problem of this specific example is that all elements are clustered in one chunk at this point and the others are empty. – Holger Oct 16 '21 at 08:14
  • 2
    @RoyAsh `ArrayList` knows that all elements are stored in its backing array from index zero to four. It’s easy to distribute them to workers. But `HashSet` has a backing array of length sixteen by default and the five elements are stored *somewhere* in that array, depending on their hash codes. Then, it gets the task “select (roughly) half of them for another worker” *without actually iterating*. Mind that the algorithm must be suitable to array lengths of 100 millions too. So it takes half of that array, hoping to catch closely half of the elements. Then, each worker repeats this recursively. – Holger Oct 16 '21 at 08:24
  • And why doesn't it work on Stream of Streams (with `flatMap`)? because of that I need to collect the inner stream to `List` and then do `flatMap(Collection::parallelStream)` on the outer `Stream` – Roy Ash Oct 22 '21 at 07:57
  • 1
    See https://stackoverflow.com/a/45039630/2711488 – Holger Oct 22 '21 at 08:00