Consider the following. Integer is immutable and Foo is mutable.
Create two lists of each.
List<Foo> foos = IntStream.range(1, 1001).mapToObj(Foo::new)
.collect(Collectors.toList());
List<Integer> ints = IntStream.range(1,1001).boxed()
.collect(Collectors.toList());
Now reduce each stream to a single result.
Foo foo = foos.stream().reduce(new Foo(0), Foo::merge);
Integer intVal = ints.stream().reduce(Integer.valueOf(0), (a,b)->a+b);
System.out.println(foo);
System.out.println(integer);
Prints
500500
500500
Both are correct.
Now reduce them again using threads via parallel streams, combining the different threads using the third argument to reduce.
Foo foo = foos.parallelStream().reduce(new Foo(0), Foo::merge, Foo::merge);
Integer integer = ints.parallelStream().reduce(Integer.valueOf(0), (a,b)->a+b, (a,b)->a+b);
System.out.println(foo);
System.out.println(integer);
Prints
570026
500500
Oops! The problem has to do with multiple threads and foo objects being updated concurrently without any proper synchronization.
If you modify the Foo
class merge method to the following, all is well.
Foo merge(Foo other) {
return new Foo(this.foo + other.foo);
}
So Foo can be still be mutable via setters but you should not use those in reduction operations. Always return a new instance instead of modifying the current one.
class Foo {
int foo;
Foo(int f) {
foo = f;
}
Foo merge(Foo other) {
foo+=other.foo;
return new Foo(foo);
}
public String toString() {
return foo + "";
}
}
}