4

When I am reducing the Set-backed stream, the identity element appears on the right of the result:

import java.util.Collection;
import java.util.List;
import java.util.Set;

public class StreamReduce {
    public static void main(String[] args) {
        // Collection<String> l = List.of("A", "B");
        Collection<String> l = Set.of("A", "B");
        var r = l.parallelStream().reduce("I", (a,b)-> { return a + b; });
        System.out.println(r);
    }
}

output:

IBIAI

When I use the same code with the List-backed stream, I get the expected output -- identity is always on the left side of elements.

How can the identity be added to the right side for the Set stream? Why is the difference with List stream?

Barat Sahdzijeu
  • 1,683
  • 1
  • 18
  • 29
  • 1
    `"I"` is not an *identity* for string concatenation. See [the documentation](https://docs.oracle.com/en/java/javase/20/docs/api/java.base/java/util/stream/Stream.html#reduce(T,java.util.function.BinaryOperator)): “*The `identity` value must be an identity for the accumulator function. This means that for all `t`, `accumulator.apply(identity, t)` is equal to `t`.*” Or [this answer](https://stackoverflow.com/a/32867283/2711488). – Holger Jun 21 '23 at 19:27
  • OK, I see your point on naming. But the question was -- if there is an explanation for different behavior for Set and List? – Barat Sahdzijeu Jun 21 '23 at 19:53
  • 1
    Since the identity is also the result of evaluating an empty set, it’s legal to substitute the result of (A, B) with the result of combining the results of (A, B) and (). This can be the result of imbalanced workload splitting. The second half of [this answer](https://stackoverflow.com/a/44802784/2711488) explains the scenario for `HashMap`; if the immutable set returned by `Set.of(…)` does array based hashing, it’s the same issue. – Holger Jun 22 '23 at 08:47
  • 1
    Building on the linked answer, these empty parts of the array ranges, not containing elements, are still processed as the implementation has to determine that there are no elements. Or, in a case like this, it doesn’t need to determine it at all, as the algorithm will end up at the identity element then. [This Q&A](https://stackoverflow.com/q/34381805/2711488) may help understanding it. Unlike `reduce` and `collect`, in case of a `forEach`, the empty parts will be skipped, which can create the impression of having no parallel processing at all, which is what the other Q&A was about. – Holger Jun 22 '23 at 09:02

1 Answers1

2

Note that the behavior of

Set.of("A", "B").parallelStream().reduce("I", String::concat)

is not different to

Stream.concat(Stream.of("A", "B"), Stream.of()).parallel()
    .reduce("I", String::concat)

or

Stream.of("A", "B", "none").parallel()
    .filter(s -> !s.equals("none")).reduce("I", String::concat)

except for the unpredictability of the order of A and B which is irrelevant here.

Since the specified identity argument must be valid as result for processing an empty sequence of elements, as well as fulfilling the f(identity, element) = element requirement, as explained in this answer, it follows that f(element, identity) = element must be true as well, to ensure that the behavior of concatenating a sequence with an empty sequence has the same (or a compatible) result.


Why does that happen in practice, technically?

The Stream implementation has to make trade-offs to provide reasonable performance for arbitrary scenarios and one is to not invest too much effort into attempting balanced workload splitting if the source structure does not genuinely support it.

As explained in this answer, hash based collections tend to have larger arrays than their number of elements, to reduce the risk of collisions, and do not know in advance how the elements are distributed over the array when a stream is created. Therefore, the workload is split based on the array indices, similar to a random access list, but resulting ranges may have different numbers of elements in it or even have no elements at all.

This may result in the scenario of having all actual elements processed by a single thread as discussed in that answer, but it can also lead to threads processing empty spans which should not have an impact on the correctness of the result, as long as the identity chosen for the reduction is correct.

This processing is similar to the filter scenario above.


However, when you’re using OpenJDK or a Java implementation based on it (or implemented the same way), this is not the case for Set.of("A", "B"). This factory method returns a special implementation for a set of two elements, not using an array.

But, as surprising as it may sound, these new immutable collection implementations have no dedicated stream support. Instead of providing a Spliterator implementation which could perfectly split into two spliterators of one element, this set inherits the default implementation based on an Iterator. In order to be able to do parallel processing at all, the iterator based spliterator has to transfer the elements into an array first. In case of just two elements, this happens on the first split operation. Afterwards, there is an array based spliterator, which supports perfectly balanced splitting (for the subsequent split operations) and an empty spliterator.

So, in this case, the processing is rather similar to the Stream.concat(…) example above.

This does not happen in case of List.of("A", "B"). This implementation also has no dedicated Stream support but the default implementation for List checks whether the list is a random access list and provides an indexed based implementation which can perform balanced splits without copying elements into an array.


This Q&A shows how you can use the behavior of this Stream implementation to visualize the way, the workload has been split.

Holger
  • 285,553
  • 42
  • 434
  • 765