Summary
The problem is that ArrayList
is by design not safe for modification by multiple threads concurrently, but the parallel stream is writing to the list from multiple threads. A good solution is to switch to an idiomatic stream implementation:
List msgList = deviceMessageInfoList.parallelStream() // Declare generic type, e.g. List<Map<String, Object>>
.filter(Objects::nonNull)
.map(m -> (DeviceTripMessageInfo) m.getMessageVO())
.filter(Objects::nonNull)
.map(DeviceTripMessageInfo::getValueMap)
.filter(Objects::nonNull)
.toList();
Issue: concurrent modification
The ArrayList
Javadocs explain the concurrent modification issue:
Note that this implementation is not synchronized. If multiple threads access an ArrayList
instance concurrently, and at least one of the threads modifies the list structurally, it must be synchronized externally. (A structural modification is any operation that adds or deletes one or more elements, or explicitly resizes the backing array; merely setting the value of an element is not a structural modification.) This is typically accomplished by synchronizing on some object that naturally encapsulates the list. If no such object exists, the list should be "wrapped" using the Collections.synchronizedList
method. This is best done at creation time, to prevent accidental unsynchronized access to the list
Note that the exception you're seeing is not the only incorrect behavior you might encounter. In my own tests of your code against large lists, the code sometimes completed without exception, but the resulting list contained only some of the elements from the source list.
Note that while switching from a parallel stream to a sequential stream would likely fix the issue in practice, it is dependent on the stream implementation, and not guaranteed by the API. Therefore, such an approach is inadvisable, as it could break in future versions of the library. Per the forEach
Javadocs:
For any given element, the action may be performed at whatever time and in whatever thread the library chooses. If the action accesses shared state, it is responsible for providing the required synchronization.
Issue: not idiomatic
Aside from the correctness issue, another issue with this approach is that it's not particularly idiomatic to use side effects within stream code. The stream documentation explicitly discourages them.
Side-effects in behavioral parameters to stream operations are, in general, discouraged, as they can often lead to unwitting violations of the statelessness requirement, as well as other thread-safety hazards.
[...]
Many computations where one might be tempted to use side-effects can be more safely and efficiently expressed without side-effects, such as using reduction instead of mutable accumulators.
Of particular note, the documentation goes on to describe the exact scenario posted in this question as an inappropriate use of side-effects in a stream:
As an example of how to transform a stream pipeline that inappropriately uses side-effects to one that does not, the following code searches a stream of strings for those matching a given regular expression, and puts the matches in a list.
ArrayList<String> results = new ArrayList<>();
stream.filter(s -> pattern.matcher(s).matches())
.forEach(s -> results.add(s)); // Unnecessary use of side-effects!
This code unnecessarily uses side-effects. If executed in parallel, the non-thread-safety of ArrayList
would cause incorrect results, and adding needed synchronization would cause contention, undermining the benefit of parallelism.
Aside: traditional non-stream solution
As an aside, this points to a solution one might use using traditional non-stream code. I will discuss it briefly, since it's helpful to understand traditional solutions to the issue of concurrent list modification. Traditionally, one might replace the ArrayList
with either a wrapped syncnhronized version using Collections.synchronizedList
or an inherently concurrent collection type such as ConcurrentLinkedQueue
. Since these approaches are designed for concurrent insertion, they solve the parallel insert issue, though possibly with additional synchronization contention overhead.
Stream solution
The stream documentation continues on with a replacement for the inappropriate use of side effects:
Furthermore, using side-effects here is completely unnecessary; the forEach()
can simply be replaced with a reduction operation that is safer, more efficient, and more amenable to parallelization:
List<String>results =
stream.filter(s -> pattern.matcher(s).matches())
.toList(); // No side-effects!
Applying this approach to your code, you get:
List msgList = deviceMessageInfoList.parallelStream() // Declare generic type, e.g. List<Map<String, Object>>
.filter(Objects::nonNull)
.map(m -> (DeviceTripMessageInfo) m.getMessageVO())
.filter(Objects::nonNull)
.map(DeviceTripMessageInfo::getValueMap)
.filter(Objects::nonNull)
.toList();