5

I am ending up with occasional array index out of bounds exception when using the following code. Any leads? The size of the array is always approximately around 29-30.

logger.info("devicetripmessageinfo size :{}",deviceMessageInfoList.size());
deviceMessageInfoList.parallelStream().forEach(msg->{
    if(msg!=null && msg.getMessageVO()!=null)
    {
        
        DeviceTripMessageInfo currentDevTripMsgInfo = 
                        (DeviceTripMessageInfo) msg.getMessageVO();
        if(currentDevTripMsgInfo.getValueMap()!=null)
        {mapsList.add(currentDevTripMsgInfo.getValueMap());}
    }
});
java.lang.ArrayIndexOutOfBoundsException: null
        at java.base/jdk.internal.reflect.GeneratedConstructorAccessor26.newInstance(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
        at java.base/java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:603)
        at java.base/java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:678)
        at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:737)
        at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
        at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
        at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
        at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:661)
        at com.*.*.*.*.worker.*.process(*.java:96)
        at com.*.jms.consumer.JMSWorker.processList(JMSWorker.java:279)
        at com.*.jms.consumer.JMSWorker.process(JMSWorker.java:244)
        at com.*.jms.consumer.JMSWorker.processMessages(JMSWorker.java:200)
        at com.*.jms.consumer.JMSWorker.run(JMSWorker.java:136)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ArrayIndexOutOfBoundsException: null
M. Justin
  • 14,487
  • 7
  • 91
  • 130
codemania23
  • 913
  • 11
  • 19
  • 1
    Can you add the full stacktrace? – lugiorgi Oct 14 '20 at 12:33
  • was just adding the same, its there now. – codemania23 Oct 14 '20 at 12:39
  • 5
    What is `mapsList`? Important is the actual type, not just the declared type. Or in other words: Is that collection/Map synchronized? If it is not, then your question might be a dupllicate of https://stackoverflow.com/questions/31014333/random-arrayindexoutofboundsexception-using-stream-to-order-map-elements-by-val – Tom Oct 14 '20 at 12:42
  • agree with @Tom we need to know the type of *mapsList* here. – Omar Abdel Bari Oct 14 '20 at 12:53
  • @OmarAbdelBari and Tom, its an arraylist. – codemania23 Oct 14 '20 at 12:57
  • Then try a synchronized list instead and check what happens – Tom Oct 14 '20 at 13:01
  • 1
    okay I'll make the change and monitor our production setup, as this happens only occasionally . Will keep you posted. But, that sounds to be the culprit. – codemania23 Oct 14 '20 at 13:02
  • @Tom even if that would be a thread safe container, you have no idea about the order in which those will be put in... – Eugene Oct 14 '20 at 19:31

2 Answers2

4

Summary

The problem is that ArrayList is by design not safe for modification by multiple threads concurrently, but the parallel stream is writing to the list from multiple threads. A good solution is to switch to an idiomatic stream implementation:

List msgList = deviceMessageInfoList.parallelStream() // Declare generic type, e.g. List<Map<String, Object>>
                .filter(Objects::nonNull)
                .map(m -> (DeviceTripMessageInfo) m.getMessageVO())
                .filter(Objects::nonNull)
                .map(DeviceTripMessageInfo::getValueMap)
                .filter(Objects::nonNull)
                .toList();

Issue: concurrent modification

The ArrayList Javadocs explain the concurrent modification issue:

Note that this implementation is not synchronized. If multiple threads access an ArrayList instance concurrently, and at least one of the threads modifies the list structurally, it must be synchronized externally. (A structural modification is any operation that adds or deletes one or more elements, or explicitly resizes the backing array; merely setting the value of an element is not a structural modification.) This is typically accomplished by synchronizing on some object that naturally encapsulates the list. If no such object exists, the list should be "wrapped" using the Collections.synchronizedList method. This is best done at creation time, to prevent accidental unsynchronized access to the list

Note that the exception you're seeing is not the only incorrect behavior you might encounter. In my own tests of your code against large lists, the code sometimes completed without exception, but the resulting list contained only some of the elements from the source list.

Note that while switching from a parallel stream to a sequential stream would likely fix the issue in practice, it is dependent on the stream implementation, and not guaranteed by the API. Therefore, such an approach is inadvisable, as it could break in future versions of the library. Per the forEach Javadocs:

For any given element, the action may be performed at whatever time and in whatever thread the library chooses. If the action accesses shared state, it is responsible for providing the required synchronization.

Issue: not idiomatic

Aside from the correctness issue, another issue with this approach is that it's not particularly idiomatic to use side effects within stream code. The stream documentation explicitly discourages them.

Side-effects in behavioral parameters to stream operations are, in general, discouraged, as they can often lead to unwitting violations of the statelessness requirement, as well as other thread-safety hazards.

[...]

Many computations where one might be tempted to use side-effects can be more safely and efficiently expressed without side-effects, such as using reduction instead of mutable accumulators.

Of particular note, the documentation goes on to describe the exact scenario posted in this question as an inappropriate use of side-effects in a stream:

As an example of how to transform a stream pipeline that inappropriately uses side-effects to one that does not, the following code searches a stream of strings for those matching a given regular expression, and puts the matches in a list.

     ArrayList<String> results = new ArrayList<>();
     stream.filter(s -> pattern.matcher(s).matches())
           .forEach(s -> results.add(s));  // Unnecessary use of side-effects!

This code unnecessarily uses side-effects. If executed in parallel, the non-thread-safety of ArrayList would cause incorrect results, and adding needed synchronization would cause contention, undermining the benefit of parallelism.

Aside: traditional non-stream solution

As an aside, this points to a solution one might use using traditional non-stream code. I will discuss it briefly, since it's helpful to understand traditional solutions to the issue of concurrent list modification. Traditionally, one might replace the ArrayList with either a wrapped syncnhronized version using Collections.synchronizedList or an inherently concurrent collection type such as ConcurrentLinkedQueue. Since these approaches are designed for concurrent insertion, they solve the parallel insert issue, though possibly with additional synchronization contention overhead.

Stream solution

The stream documentation continues on with a replacement for the inappropriate use of side effects:

Furthermore, using side-effects here is completely unnecessary; the forEach() can simply be replaced with a reduction operation that is safer, more efficient, and more amenable to parallelization:

     List<String>results =
         stream.filter(s -> pattern.matcher(s).matches())
               .toList();  // No side-effects!

Applying this approach to your code, you get:

List msgList = deviceMessageInfoList.parallelStream() // Declare generic type, e.g. List<Map<String, Object>>
                .filter(Objects::nonNull)
                .map(m -> (DeviceTripMessageInfo) m.getMessageVO())
                .filter(Objects::nonNull)
                .map(DeviceTripMessageInfo::getValueMap)
                .filter(Objects::nonNull)
                .toList();
M. Justin
  • 14,487
  • 7
  • 91
  • 130
1

Even if you change that to a synchronized (or better said a thread-safe List), with your current approach, you still don't have a guaranteed order of how the elements are going to be put in. The documentation, btw, is very clear to discourage such things via forEach, here. Just look-up Side-Effects.

This entire thing can be done in far better way (and easier to read too):

 deviceMessageInfoList
      .stream()
      .parallel()
      .filter(Objects::notNull)
      .map(x -> x.getMessageVO())
      .filter(Objects::notNull) 
      .map(x -> (DeviceTripMessageInfo) x.getMessageVO())
      .map(DeviceTripMessageInfo::getValueMap)
      .filter(Objects::notNull)
      .collect(Collectors.toList());
Eugene
  • 117,005
  • 15
  • 201
  • 306
  • 1
    Eugene, thanks for the answer but I am not worried about the order of elements although thanks for linking me to the website. I'll give it a read. – codemania23 Oct 16 '20 at 02:33
  • @codemania23 go ahead. I did not compile this code btw, but I've tried to be accurate. – Eugene Oct 16 '20 at 02:48