17

The following code is not putting all the elements in the destination list after finishing parallel process. Is there any reason for this?

public static void main(String[] args) {

    List<Integer> source =new ArrayList<>();
    List<Integer> destination = new ArrayList<>();
    IntStream.range(0, 10000).forEach(element ->{
        source.add(element);
    });

    //System.out.println(source.size());

    source.parallelStream().forEach(c->{
        destination.add(c);
    });

    System.out.println("Source Size:"+source.size());
    System.out.println("destination size:"+destination.size());
}

Output: Source Size:10000 destination size:4343

Stefan Zobel
  • 3,182
  • 7
  • 28
  • 38
Prabhu Kvn
  • 167
  • 1
  • 1
  • 4
  • 2
    This is an interesting read--> [Why is shared mutability bad?](https://stackoverflow.com/questions/44219903/why-is-shared-mutability-bad/44220017#44220017) – Ousmane D. Feb 06 '20 at 13:10
  • This has clear explanation https://stackoverflow.com/questions/3589308/arraylist-and-multithreading-in-java – Ryuzaki L Feb 06 '20 at 13:38

4 Answers4

11

Because ArrayList is not a thread-safe collection. Using a thread-safe collection like CopyOnWriteArrayList would make it correct but not necessarily efficient.

Using a Collector instead would be much simpler and correct. e.g.

source.parallelStream().collect(Collectors.toList())
Naman
  • 27,789
  • 26
  • 218
  • 353
Sleiman Jneidi
  • 22,907
  • 14
  • 56
  • 77
5

The forEach operation of the parallel stream is adding elements to an un-synchronized Collection (an ArrayList) from multiple threads. Therefore, the operation is not thread safe, and has unexpected results.

Using forEachOrdered() instead of forEach() will ensure all the elements of the source List are added to the destination List.

However, as mentioned in the other answer, using collect(Collectors.toList()) is the correct way to produce an output List from a Stream.

Eran
  • 387,369
  • 54
  • 702
  • 768
  • 3
    `forEachOrdered` only guarantees that elements are processed in the same encounter order. It doesn't guarantee thread safety – fps Feb 06 '20 at 13:05
  • 2
    @FedericoPeraltaSchaffner well, I tried it, and it worked. And the Javadoc says that "Performing the action for one element happens-before performing the action for subsequent elements", so in the scenario described in this question, the current element of the Stream will be added to the output List before any subsequent elements are added. – Eran Feb 06 '20 at 13:09
  • 3
    @FedericoPeraltaSchaffner you can’t perform actions in-order and parallel at the same time. While elements may be provided by arbitrary calling threads, they must use a synchronization mechanism. The phrase “*Performing the action for one element happens-before performing the action for subsequent elements*” implies that you *can* use it for filling a data structure like an `ArrayList`. So while not recommended, it works. Using `ThreadLocal` or such alike wouldn’t work… – Holger Feb 06 '20 at 18:09
  • @Holger Thanks for the clarification – fps Feb 06 '20 at 19:22
0

Because you should use concurrent version of the destination list. You have several options to choose, i.e.:

CopyOnWriteArrayList<Integer> destination  = new CopyOnWriteArrayList<>();

It is expensive to modify this list because it makes a copy of the underlying array with every modification but fast when reading the list.

Collections.synchronizedList(destination);

Every method is synchronized so it will be thread safe

Planck Constant
  • 1,406
  • 1
  • 17
  • 19
0

Because you are creating more than one thread and your collection is not thread safe, you may have to deal with issues like ArrayIndexOutofbound Exception or your data will not be stored properly. Create a thread safe list and then you can add values.

The below code will solve your issue.

List<Integer> destination = Collections.synchronizedList(new ArrayList<>());

Full code:

public static void main(String args[]) {
        List<Integer> source =new ArrayList<>();
        List<Integer> destination = Collections.synchronizedList(new ArrayList<>());
        IntStream.range(0, 10000).forEach(element ->{
            source.add(element);
        });

        //System.out.println(source.size());

        source.parallelStream().forEach(c->{
            destination.add(c);
        });

        System.out.println("Source Size:"+source.size());
        System.out.println("destination size:"+destination.size());
    }