1

I want to implement a thread safe Map of Queues.

I intent to start with an empty Map. If the key does not exist, I want to create a new Map entry with a new Queue. If the key does exist, I want to add to the Queue. My proposed implementation is as follows:

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class StackOverFlowExample {

    private final Map<String, ConcurrentLinkedQueue<String>> map = new ConcurrentHashMap<>();

    public void addElementToQueue(String key, String value){
        if (map.containsKey(key)){
            map.get(key).add(value);
        }
        else{
            ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
            queue.add(value);
            map.put(key, queue);
        }           
    }    
}

My concern is that is when multiple threads attempt to add a new value to the Map, the first will put a new Map entry with a new Queue, the second will wait, and then put a new Queue for the key, rather than adding to the Queue. My concurrency / concurrency API knowledge is slim at best. Perhaps the concurrency is in-place to avoid this? Advice would be much appreciated.

whoan
  • 8,143
  • 4
  • 39
  • 48
Robert Bain
  • 9,113
  • 8
  • 44
  • 63

3 Answers3

5

This pattern has probably been posted many times on SO (efficiently adding to a concurrent map):

Queue<String> q = map.get(key);
if(q == null) {
  q = new ConcurrentLinkedQueue<String>();
  Queue<String> curQ = map.putIfAbsent(key, q);
  if(curQ != null) {
    q = curQ;
  }
}
q.add(value);

Note that since Java 8, this can be replaced with computeIfAbsent().

jtahlborn
  • 52,909
  • 5
  • 76
  • 118
  • if(curQ != null) { q = curQ; } this case never be executed. – Shine Feb 22 '22 at 02:16
  • @Shine - the idea here is that `map` is being concurrently modified, so the value for the given `key` might have changed between the initial `get()` and the `putIfAbsent()` call – jtahlborn Feb 22 '22 at 16:23
1

So your fear is that thread A and thread B will do the following:

thread A: lock ConcurrentHashMap Look for Queue "x" (not found) unlock ConcurrentHashMap create Queue "x" lock ConcurrentHashMap Insert Queue X unlock ConcurrentHashMap Thread B: Lock ConcurrentHashMap (while thread A is in 'create Queue X') look for queue X (not found) unlock ConcurrentHashMap (thread A then gets lock) create Queue "x" v2 lock ConcurrentHashMap Insert Queue X v2 (overwriting the old entry) unlock ConcurrentHashMap

That is in fact a real issue, but one that is easily resolved by making AddElementToQueue be a synchronized method. Then there can only be one thread inside AddElementToQueue at any given time, and thus the synchronization hole between the first 'unlock' and the second 'lock' is closed.

Thus

public synchronized void addElementToQueue(String key, String value){

should resolve your lost-queue problem.

eric.green
  • 440
  • 3
  • 8
  • That'd make sense if ConcurrentHashMap used locking on retrieval. It doesn't. – spudone Oct 21 '14 at 22:56
  • Ah right, ConcurrentHashMap "merely" uses data structures that are multi-reader resilient. Locking is only used by writers for the brief amount of time needed to add or delete an element from the map. I keep forgetting that implementation detail because it really doesn't matter from a programmatic point of view. The problem still occurs at the same place (the simultaneous "does X exist in hashmap?" at the same time as "create a new hashmap") and the solution is the same -- synchronize the addElementToQueue method. – eric.green Oct 21 '14 at 23:33
  • Using the synchronized block, he doesn't really need the concurrent data structures, unless there's more to the code. – spudone Oct 21 '14 at 23:41
  • As this particular StackOverflow entry explains, that's bad advice if he has multiple threads: http://stackoverflow.com/questions/1003026/hashmap-concurrency-issue – eric.green Oct 23 '14 at 00:04
  • The question in that link specifically refers to "no locking" which is not what I said. – spudone Oct 27 '14 at 17:10
0

If Java 8 is an option :

public void addElementToQueue(String key, String value) {
    map.merge(key, new ConcurrentLinkedQueue<>(Arrays.asList(value)), (oldValue, coming) -> {
        oldValue.addAll(coming);
        return oldValue;
    });
}
Bax
  • 4,260
  • 5
  • 43
  • 65