22

I proceed with java 8 learning.

I have found an interesting behavior:

let's see code sample:

// identity value and accumulator and combiner
Integer summaryAge = Person.getPersons().stream()
        //.parallel()  //will return surprising result
        .reduce(1,
                (intermediateResult, p) -> intermediateResult + p.age,
                (ir1, ir2) -> ir1 + ir2);
System.out.println(summaryAge);

and model class:

public class Person {

    String name;

    Integer age;
    ///...

    public static Collection<Person> getPersons() {
        List<Person> persons = new ArrayList<>();
        persons.add(new Person("Vasya", 12));
        persons.add(new Person("Petya", 32));
        persons.add(new Person("Serj", 10));
        persons.add(new Person("Onotole", 18));
        return persons;
   }
}

12+32+10+18 = 72. For sequential stream, this code always returns 73 which is 72 + 1 but for parallel, it always returns 76 which is 72 + 4*1 (4 is equal to stream elements count).

When I saw this result I thought that it is strange that parallel stream and sequential streams return different results.

Am I broke contract somewhere?

P.S.

for me, 73 is expected result but 76 is not.

Jason Law
  • 965
  • 1
  • 9
  • 21
gstackoverflow
  • 36,709
  • 117
  • 359
  • 710

7 Answers7

51

The identity value is a value, such that x op identity = x. This is a concept which is not unique to Java Streams, see for example on Wikipedia.

It lists some examples of identity elements, some of them can be directly expressed in Java code, e.g.

  • reduce("", String::concat)
  • reduce(true, (a,b) -> a&&b)
  • reduce(false, (a,b) -> a||b)
  • reduce(Collections.emptySet(), (a,b)->{ Set<X> s=new HashSet<>(a); s.addAll(b); return s; })
  • reduce(Double.POSITIVE_INFINITY, Math::min)
  • reduce(Double.NEGATIVE_INFINITY, Math::max)

It should be clear that the expression x + y == x for arbitrary x can only be fulfilled when y==0, thus 0 is the identity element for the addition. Similarly, 1 is the identity element for the multiplication.

More complex examples are

  • Reducing a stream of predicates

    reduce(x->true, Predicate::and)
    reduce(x->false, Predicate::or)
    
  • Reducing a stream of functions

    reduce(Function.identity(), Function::andThen)
    
Holger
  • 285,553
  • 42
  • 434
  • 765
27

The @holger answer greatly explain what is the identity for different function but doesn't explain why we need identity and why you have different results between parallel and sequential streams.

Your problem can be reduced to summing a list of element knowing how to sum 2 elements.

So let's take a list L = {12,32,10,18} and a summing function (a,b) -> a + b

Like you learn at school you will do:

(12,32) -> 12 + 32 -> 44
(44,10) -> 44 + 10 -> 54
(54,18) -> 54 + 18 -> 72

Now imagine our list become L = {12}, how to sum this list? Here the identity (x op identity = x) comes.

(0,12) -> 12

So now you can understand why you get +1 to your sum if you put 1 instead of 0, that's because you initialize with a wrong value.

(1,12) -> 1 + 12 -> 13
(13,32) -> 13 + 32 -> 45
(45,10) -> 45 + 10 -> 55
(55,18) -> 55 + 18 -> 73

So now, how can we improve speed? Parallelize things

What if we can split our list and give those splitted list to 4 different thread (assuming 4-core cpu) and then combined it? This will give us L1 = {12}, L2 = {32}, L3 = {10}, L4 = {18}

So with identity = 1

  • thread1: (1,12) -> 1+12 -> 13
  • thread2: (1,32) -> 1+32 -> 33
  • thread3: (1,10) -> 1+10 -> 11
  • thread4: (1,18) -> 1+18 -> 19

and then combine, 13 + 33 + 11 +19, which is equal to 76, this explain why the error is propagated 4 times.

In this case parallel can be less efficient.

But this result depends on your machine and input list. Java won't create 1000 threads for 1000 elements and the error will propagate more slowly as the input grows.

Try running this code summing one thousand 1s, the result is quite close to 1000

public class StreamReduce {

public static void main(String[] args) {
        int sum = IntStream.range(0, 1000).map(i -> 1).parallel().reduce(1, (r, e) -> r + e);
        System.out.println("sum: " + sum);
    }
}

Now you should understand why you have different results between parallel and sequential if you break the identity contract.

See Oracle doc for proper way to write your sum


What's the identity of a problem?

Aniruddh Parihar
  • 3,072
  • 3
  • 21
  • 39
user43968
  • 2,049
  • 20
  • 37
5

Yes, you are breaking the contract of the combiner function. The identity, which is the first element of reduce, must satisfy combiner(identity, u) == u. Quoting the Javadoc of Stream.reduce:

The identity value must be an identity for the combiner function. This means that for all u, combiner(identity, u) is equal to u.

However, your combiner function performs an addition and 1 is not the identity element for addition; 0 is.

  • Change the identity used to 0 and you will have no surprise: the result will be 72 for the two options.

  • For your own amusement, change your combiner function to perform a multiplication (keeping the identity to 1) and you will also notice the same result for both options.

Let's build an example where the identity is neither 0 or 1. Given your own domain class, consider:

System.out.println(Person.getPersons().stream()
                    .reduce("", 
                            (acc, p) -> acc.length() > p.name.length() ? acc : p.name,
                            (n1, n2) -> n1.length() > n2.length() ? n1 : n2));

This will reduce the stream of Person to the longest person name.

Tunaki
  • 132,869
  • 46
  • 340
  • 423
3

The JavaDoc documentation for Stream.reduce specifically states that

The identity value must be an identity for the combiner function

1 is not an identity value for the addition operator, which is why you get unexpected results. If you used 0 (which is the addition operator's identity value) then you'd get the same result from serial and parallel streams.

Ian Roberts
  • 120,891
  • 16
  • 170
  • 183
3

Your question really has 2 parts. Why are you getting 76 using parallel when you get 73 using sequential. And what the identity, as far as multiplication and addition goes for Reduce.

Answering the latter will help in answering the first part. The identity is a mathematical concept, I'll try to keep in as simple terms for those non-math geeks. The identity is the value that applied to itself returns the same value.

The additive identity is 0. If we were to assume that a is any number, the identity property of numbers state that a plus its identity will return a. (basically, a + 0=a). The multiplicative identity says b multiplied by its identity, which is 1) always returns itself, b.

The java reduce method uses the identity a little more variably. Giving us the ability to say, we would like to perform the operations of addition and multiplication with an additional step, if we choose to. If you were to take your example: and change the identity to 0, you will get 72.

    Integer summaryAge = Person.getPersons().stream()
            .reduce(0, (intermediateResult, p) -> intermediateResult + p.age,
                    (ir1, ir2) -> ir1 + ir2);
    System.out.println(summaryAge);

This simply sums the ages together and returns that value. Change it to 100, you'll return 172. But when you run as parallel, why does your result get 76, and in my example would return 472? Its because when you use a stream the results are considered a set, instead of individual elements. Per the JavaDocs on streams:

Streams facilitate parallel execution by reframing the computation as a pipeline of aggregate operations, rather than as imperative operations on each individual element.

Why is the treatment of sets important, by using the standard stream (non: parallel or parallelStream) what you are doing in your example is taking the sum and treating that is a single number. Hence you get 73, and changing the identity to 100, I would get 172. But why is it that using parallel, do you get 76? or in my example 472? Because java is now splitting the set into smaller (single) elements, adding its identity (which you stated as 1) summing that, then summing the result to the rest of the elements, which has performed the same operation.

If your intentions are to add 1 to the result, its safer to follow Tagir's suggestion and add 1 to the end after the return of the stream.

chris m
  • 31
  • 1
1

In addition to the excellent answers posted before it should be mentioned that if you want to start summing with something other than zero, you can just move the initial addend out of the stream operation:

Integer summaryAge = Person.getPersons().stream()
        //.parallel()  //will return no surprising result
        .reduce(0, (intermediateResult, p) -> intermediateResult + p.age,
                    (ir1, ir2) -> ir1 + ir2)+1;

The same is possible for other reduction operations. For example, if you want to calculate the product starting with 2 instead of doing wrong .reduce(2, (a, b) -> a*b), you can do .reduce(1, (a, b) -> a*b)*2. Just find the real identity for your operation, move the "false identity" outside and you will get the correct result both for sequential and parallel case.

Finally note that there's more efficient way to solve your problem:

Integer summaryAge = Person.getPersons().stream()
        //.parallel()  //will return no surprising result
        .collect(Collectors.summingInt(p -> p.age))+1;

or alternatively

Integer summaryAge = Person.getPersons().stream()
        //.parallel()  //will return no surprising result
        .mapToInt(p -> p.age).sum()+1;

Here the summation is performed without boxing on every intermediate step, thus it can be much faster.

Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
0

I have a slightly different perspective here. Although @user43968's answer gives a plausible justification why the identity is needed for parallelism, is that really necessary? I believe not because associativity of the binary operator itself is enough to allow us to parallelize the reduce job.

Given an expression A op B op C op D, associativity guarantees that its evaluation is equivalent to (A op B) op (C op D), such that we can evaluate the sub expressions (A op B) and (C op D) in parallel and combine the results afterward without changing the final result. For example, with addition operation, initial value = 10, and L = [1, 2, 3], we want to compute 10 + 1 + 2 + 3 = 16. We should be okay to compute 10 + 1 = 11 and 2 + 3 = 5 in parallel and do 11 + 5 = 16 at last.

The only reason why Java requires the initial value to be an identity I can think of is because the language developers wanted to make the implementation simple and all parallelized sub jobs symmetric. Otherwise, they may have had to differentiate the first sub job that takes an initial value as input vs other sub jobs that don't. Now, they just need to equally distribute the initial value to each sub job, which is also a "reduce" by its own.

However, it's more about the implementation limitation, which should not be surfaced to the language users IMO. My gut feeling tells me there must exist a simple implementation that does not require the initial value to be an identity.

czheo
  • 1,771
  • 2
  • 12
  • 22
  • 1
    The identity value is indeed not strictly needed for parallelization, which is best indicated by the fact that there is a `reduce(…)` method not taking an identity element at all. There is no need to differentiate between the first job and the others though, as there is no need add an identity value at all. But by using the identity value as starting value, a worker thread does not need to track whether it has seen elements at all (mind that filtering happens in parallel too). You can replace `reduce(value, op)` by `reduce(op).orElse(value)`, then `value` doesn’t need to be an identity element – Holger May 20 '22 at 09:09