10

There are two test cases which use parallelStream():

List<Integer> src = new ArrayList<>();
for (int i = 0; i < 20000; i++) {
  src.add(i);
}
List<String> strings = new ArrayList<>();
       
src.parallelStream().filter(integer -> (integer % 2) == 0).forEach(integer -> strings.add(integer + ""));
    
System.out.println("=size=>" + strings.size());
=size=>9332
List<Integer> src = new ArrayList<>();
for (int i = 0; i < 20000; i++) {
  src.add(i);
}
List<String> strings = new ArrayList<>();

src.parallelStream().forEach(integer -> strings.add(integer + ""));

System.out.println("=size=>" + strings.size());
=size=>17908

Why do I always lose data when using parallelStream? What did i do wrong?

jmizv
  • 1,172
  • 2
  • 11
  • 28
Hugh
  • 105
  • 1
  • 7
  • 6
    What I think is happening is that you are not accessing the list in a synchronized fashion, which leaves the list in an inconsistent state. What if you try and use a thread-safe collection? I believe there was something called SynchronizedList? – npinti Sep 16 '20 at 07:07
  • I sometimes get a `ArrayIndexOutOfBoundsException` running this code. Probably a synchronisation issue. – Sweeper Sep 16 '20 at 07:15
  • I completely agree with @npinti, just change the strings variable assignation from ArrayList to Vector and try again. – raven1981 Sep 16 '20 at 07:23
  • 1
    Please use `Collections.synchronizedList()` instead of `Vector`. See: https://stackoverflow.com/questions/1386275/why-is-java-vector-and-stack-class-considered-obsolete-or-deprecated – jmizv Sep 16 '20 at 07:30
  • Also I could recommend not to use parallelStream() for your goal. There is link with explanation: https://dzone.com/articles/think-twice-using-java-8 . Also I saw video with member of Stream API developer team, where he explains functionality and features of stream in Java, but video on Russian language. I try find video with english subtitles or video from another conference. – NikMashei Sep 16 '20 at 07:40

4 Answers4

7

ArrayList isn't thread safe. You need to do

List<String> strings = Collections.synchronizedList(new ArrayList<>());

or

List<String> strings = new Vector<>();

to ensure all updates are synchronized, or switch to

List<String> strings = src.parallelStream()
    .filter(integer -> (integer % 2) == 0)
    .map(integer -> integer + "")
    .collect(Collectors.toList());

and leave the list building to the Streams framework. Note that it's undefined whether the list returned by collect is modifiable, so if that is a requirement, you may need to modify your approach.

In terms of performance, Stream.collect is likely to be much faster than using Stream.forEach to add to a synchronized collection, since the Streams framework can handle collection of values in each thread separately without synchronization and combine the results at the end in a thread safe fashion.

markusk
  • 6,477
  • 34
  • 39
  • Or you could also use CopyOnWriteArrayList for your purpose. There is useful documentation: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CopyOnWriteArrayList.html – NikMashei Sep 16 '20 at 07:28
  • 1
    @Zogger `CopyOnWriteArrayList` makes a new copy of the entire array on each update. It's suitable for situations where there are many reads and few writes, but not for use cases like these where there will be a large number of updates. – markusk Sep 16 '20 at 07:30
  • you definitely right. Actually idk what author of question is doing with his data and I suggest alternative way to proceed list with values asynchronously :) – NikMashei Sep 16 '20 at 07:35
1

ArrayList isn't thread-safe. While 1 thread sees a list with 30 elements another might still see 29 and override the 30th position (loosing 1 element).

Another issue might arise when the array backing the list needs to be resized. A new array (with double the size) is created and elements from the original array are copied into it. While other threads might have added stuff the thread doing the resizing might not have seen this or multiple threads are resizing and eventually only 1 will win.

When using multiple threads you need to either do some syncronized when accessing the list OR use a multi-thread safe list (by either wrapping it in a SynchronizedList or by using a CopyOnWriteArrayList to mention 2 possible solutions). Even better would be to use the collect method on the stream to put everything into a list.

M. Deinum
  • 115,695
  • 22
  • 220
  • 224
1

ParallelStream with forEach is a deadly combo if not used carefully. Please take a look at below points to avoid any bugs:

  1. If you have a preexisting list object in which you want to add more objects from a parallelStream loop, Use Collections.synchronizedList & pass that pre-existing list object to it before looping through the parallelstream.

  2. If you have to create a new list, then you can use Vector to initialize the list outside the loop. or

  3. If you have to create a new list, then simply use parallelStream and collect the output at the end.

NoisyBoy
  • 354
  • 3
  • 6
0

You lose the benefits of using stream (and parallel stream) when you try to do mutation. As a general rule, avoid mutation when using streams. Venkat Subramaniam explains why. Instead, use collectors. Also try to get a lot accomplished within the stream chain. For example:

System.out.println(
                IntStream.range(0, 200000)
                        .filter(i -> i % 2 == 0)
                        .mapToObj(String::valueOf)
                        .collect(Collectors.toList()).size()
        );

You can run that in parallelStream by adding .parallel()

asdofindia
  • 139
  • 4
  • 1
    While the information here is useful, it doesn't answer the question, which is "Why do I always lose data when using parallelStream?" – markusk Sep 16 '20 at 08:28
  • I think it answers the question "What did I do wrong?" – asdofindia Sep 16 '20 at 08:35
  • There are valid use cases for both `parallelStream` and `forEach`, and situations where you can't avoid mutation. Responding "don't do that" doesn't explain why it didn't work in this case. – markusk Sep 16 '20 at 08:53