8

I have a scenario where I have to maintain a Map which can be populated by multiple threads, each modifying their respective List (unique identifier/key being the thread name), and when the list size for a thread exceeds a fixed batch size, we have to persist the records to the database.

Aggregator class

private volatile ConcurrentHashMap<String, List<T>>  instrumentMap = new ConcurrentHashMap<String, List<T>>();
private ReentrantLock lock ;

public void addAll(List<T> entityList, String threadName) {
    try {
        lock.lock();
        List<T> instrumentList = instrumentMap.get(threadName);
        if(instrumentList == null) {
            instrumentList = new ArrayList<T>(batchSize);
            instrumentMap.put(threadName, instrumentList);
        }

        if(instrumentList.size() >= batchSize -1){
            instrumentList.addAll(entityList);
            recordSaver.persist(instrumentList); 
            instrumentList.clear();
        } else {
            instrumentList.addAll(entityList);  
        }
    } finally {
        lock.unlock();
    }

}

There is one more separate thread running after every 2 minutes (using the same lock) to persist all the records in Map (to make sure we have something persisted after every 2 minutes and the map size does not gets too big)

if(//Some condition) {
    Thread.sleep(//2 minutes);
    aggregator.getLock().lock();
    List<T> instrumentList = instrumentMap.values().stream().flatMap(x->x.stream()).collect(Collectors.toList());
    if(instrumentList.size() > 0) {
        saver.persist(instrumentList);
        instrumentMap .values().parallelStream().forEach(x -> x.clear());
        aggregator.getLock().unlock();
    }
}

This solution is working fine in almost for every scenario that we tested, except sometimes we see some of the records went missing, i.e. they are not persisted at all, although they were added fine to the Map.

My questions are:

  1. What is the problem with this code?
  2. Is ConcurrentHashMap not the best solution here?
  3. Does the List that is used with the ConcurrentHashMap have an issue?
  4. Should I use the compute method of ConcurrentHashMap here (no need I think, as ReentrantLock is already doing the same job)?
grooveplex
  • 2,492
  • 4
  • 28
  • 30
Amit
  • 656
  • 1
  • 6
  • 19
  • 1
    Not sure about the missing records, but if all access to `instrumentMap` is guarded by the `lock` then there's no benefit to using `ConcurrentMap`. – Slaw Feb 13 '19 at 06:19
  • @Slaw i agree i haven't written this and not looking to change this until i understand the problem with code.. thanks for your answer – Amit Feb 13 '19 at 06:22
  • General rule , between lock() and unlock() , there should be a while(condition) to check the shared data(map) instead you are using if(condition) . Please check the locking guidelines for producer consumer problem. – JavaUser Feb 13 '19 at 07:22
  • 2
    Well, I'm not able to see the problem in the code shown. While that doesn't mean the problem isn't there, a proper [mcve] demonstrating the issue would help. One thing to check for is any unguarded access taking place. Does `recordSaver.persist` ever pass the list to another thread down the line in a non-blocking fashion? I ask because you pass the `List` itself, not a copy, which means non-synchronized access could be happening somewhere. In contrast, your save-every-two-minutes thread calls `saver.persist` with a "copy" containing all the flattened values in the map. – Slaw Feb 13 '19 at 07:29
  • @Slaw your point on non-blocking access - i will check for sure – Amit Feb 13 '19 at 07:49
  • @Amit once you figure out the problem, do post an answer. – nits.kk Feb 13 '19 at 09:19
  • I will for sure @nits-kk but that will take 15-20 days as this issue occurs rarely. – Amit Feb 13 '19 at 09:28
  • 1
    @Slaw can you please add your observation in answers so that i can credit you for same if it works :) – Amit Feb 13 '19 at 09:29
  • 1
    Is there even a need to store the instruments in a map? Persistence is done with instrument-lists, the "threadName" key appears to be unused. – vanOekel Feb 17 '19 at 10:57
  • just to be really sure - please log both `aggregator.getLock()` and `lock` just before you acquire it. With `aggregator.getLock()` also when releasing :) if the hashcodes (like the last part of ReentrantLock@4682fer34 or whatever) is exactly the same, at least we are sure we are synchronising correctly. – almondandapricot Feb 18 '19 at 18:54
  • @vanOekel agree its a badly written code not the perfect way of doing it. – Amit Feb 19 '19 at 05:46

3 Answers3

2

The answer provided by @Slaw in the comments did the trick. We were letting the instrumentList instance escape in non-synchronized way i.e. access/operations are happening over list without any synchonization. Fixing the same by passing the copy to further methods did the trick.

Following line of code is the one where this issue was happening

recordSaver.persist(instrumentList); instrumentList.clear();

Here we are allowing the instrumentList instance to escape in non-synchronized way i.e. it is passed to another class (recordSaver.persist) where it was to be actioned on but we are also clearing the list in very next line(in Aggregator class) and all of this is happening in non-synchronized way. List state can't be predicted in record saver... a really stupid mistake.

We fixed the issue by passing a cloned copy of instrumentList to recordSaver.persist(...) method. In this way instrumentList.clear() has no affect on list available in recordSaver for further operations.

Amit
  • 656
  • 1
  • 6
  • 19
0

I see, that you are using ConcurrentHashMap's parallelStream within a lock. I am not knowledgeable about Java 8+ stream support, but quick searching shows, that

  1. ConcurrentHashMap is a complex data structure, that used to have concurrency bugs in past
  2. Parallel streams must abide to complex and poorly documented usage restrictions
  3. You are modifying your data within a parallel stream

Based on that information (and my gut-driven concurrency bugs detector™), I wager a guess, that removing the call to parallelStream might improve robustness of your code. In addition, as mentioned by @Slaw, you should use ordinary HashMap in place of ConcurrentHashMap if all instrumentMap usage is already guarded by lock.

Of course, since you don't post the code of recordSaver, it is possible, that it too has bugs (and not necessarily concurrency-related ones). In particular, you should make sure, that the code that reads records from persistent storage — the one, that you are using to detect loss of records — is safe, correct, and properly synchronized with rest of your system (preferably by using a robust, industry-standard SQL database).

user1643723
  • 4,109
  • 1
  • 24
  • 48
  • Removing a call to parallelStream - might increase robustness but not related to the bug we are facing (as the bug is never observed at the point where we are using it). Hashmap with lock or ConcurrentHashMap.compute any one of them is sufficient and i agree the way its used here is overkill...but that's not affecting the records anyway. @slaw suggestions - the only loophole we missed i.e. allowing the reference to escape to a method( recordSaver.persist) that is not synchronized and clearing the list just after doing this... means records can go out of scope – Amit Feb 19 '19 at 05:52
0

It looks like this was an attempt at optimization where it was not needed. In that case, less is more and simpler is better. In the code below, only two concepts for concurrency are used: synchronized to ensure a shared list is properly updated and final to ensure all threads see the same value.

import java.util.ArrayList;
import java.util.List;

public class Aggregator<T> implements Runnable {

    private final List<T> instruments = new ArrayList<>();

    private final RecordSaver recordSaver;
    private final int batchSize;


    public Aggregator(RecordSaver recordSaver, int batchSize) {
        super();
        this.recordSaver = recordSaver;
        this.batchSize = batchSize;
    }

    public synchronized void addAll(List<T> moreInstruments) {

        instruments.addAll(moreInstruments);
        if (instruments.size() >= batchSize) {
            storeInstruments();
        }
    }

    public synchronized void storeInstruments() {

        if (instruments.size() > 0) {
            // in case recordSaver works async
            // recordSaver.persist(new ArrayList<T>(instruments));
            // else just:
            recordSaver.persist(instruments);
            instruments.clear();
        }
    }


    @Override
    public void run() {

        while (true) {
            try { Thread.sleep(1L); } catch (Exception ignored) {
                break;
            }
            storeInstruments();
        }
    }


    class RecordSaver {
        void persist(List<?> l) {}
    }

}
vanOekel
  • 6,358
  • 1
  • 21
  • 56