7

I have a test code like this :

List<Integer> list = new ArrayList<>(1000000);

for(int i=0;i<1000000;i++){
    list.add(i);
}

List<String> values = new ArrayList<>(1000000);

list.stream().forEach(
    i->values.add(new Date().toString())
);

System.out.println(values.size()); 

Running this, I got a correct output: 1000000.

However, if I change the stream() to parallelStream(), as this:

 list.parallelStream().forEach(
    i->values.add(new Date().toString())
 );

I got a random output, e.g.: 920821.

What's wrong?

Tunaki
  • 132,869
  • 46
  • 340
  • 423
Yang Rui
  • 241
  • 5
  • 11

3 Answers3

13

An ArrayList is not synchronized. Trying to concurrently add elements to it is not defined. From forEach:

For parallel stream pipelines, this operation does not guarantee to respect the encounter order of the stream, as doing so would sacrifice the benefit of parallelism. For any given element, the action may be performed at whatever time and in whatever thread the library chooses.

In your second example, you end up with multiple threads calling add on the array list at the same time and ArrayList documentation says:

Note that this implementation is not synchronized. If multiple threads access an ArrayList instance concurrently, and at least one of the threads modifies the list structurally, it must be synchronized externally.

The wrong solution

If you change the use of an ArrayList to a Vector, you'll get the correct result, because this list implementation is synchronized. Its Javadoc says:

Unlike the new collection implementations, Vector is synchronized.

However, do not use it! Furthermore, it might end up being slower because of the explicit synchronization.

The correct approach

It is explicitly to avoid this situation that the Stream API provides the mutable reduction paradigm, with the use of the collect method. The following

List<String> values = list.stream().map(i -> "foo").collect(Collectors.toList());

will always provide the correct result, whether run in parallel or not. The Stream pipeline internally handles the concurrency and guarantees that it is safe to use a non-concurrent collector in a collect operation of a parallel stream. Collectors.toList() is a built-in collector accumulating the elements of a Stream into a list.

Community
  • 1
  • 1
Tunaki
  • 132,869
  • 46
  • 340
  • 423
  • 3
    And while using `Vector` will ensure that the number of elements is correct, it will not guaranty to have the result elements in the right order when used with `forEach`. On the other hand, when fixing that by using `forEachOrdered`, it would work with an `ArrayList` again, but still having worse performance than a sequential stream in most cases… – Holger Sep 12 '16 at 15:21
5

Using a Consumer, you have to worry about thread safety. A simpler solution it to let the Stream API accumulate the results.

List<String> values = IntStream.range(0, 1_000_000).parallel()
                               .mapToObj(i -> new Date().toString())
                               .collect(Collectors.toList());

A key reason to avoid using a thread safe collector like Vector is it requires each thread to obtain a shared lock with is a bottleneck, i.e. you will spend time obtaining and releasing the lock and only one thread at a time can access it. You can easily end up with a solution which is slower than using one thread alone.

Peter Lawrey
  • 525,659
  • 79
  • 751
  • 1,130
3

values.add(String) is not thread safe. When you invoke this method from different threads without synchronization it is no guarantee that it will work as expected.

To fix that you can:

  • use thread-safe collection like Vector or CopyOnWriteArrayLis.
  • Explicitly synchronize your code. For example put synchronize(this){values.add(new Date().toString())} into your code. Note i-> is outside synchronize block
  • Or in this case map elments to get new stream like in @PeterLawrey answer: IntStream.range(0, 1_000_000).parallel().mapToObj(i -> new Date().toString()).collect(Collectors.toList());
talex
  • 17,973
  • 3
  • 29
  • 66
  • Just for the record, `CopyOnWriteArrayList` will not work, it is just not built for such large collections. – vsnyc Sep 13 '16 at 22:30
  • 1
    I meant to state that it will take a very long time for the operation to complete with 1000,000 entries – vsnyc Sep 13 '16 at 22:37