2

I'm trying to write a thread-safe Map[K, Set[V]] implementation in java.

  1. If a unique key is added to the map, a new Set should be created (and added to)
  2. If a non unique key is added to the map, the existing Set should be added to.
  3. If a value is removed from a Set causing the Set to be empty, the entry should be removed from the map to avoid memory leaks.
  4. I'd like to solve this without needing to synchronize the whole thing

I have included a failing test case below, please let me know if you have a solution.

package org.deleteme;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import junit.framework.Assert;

import org.junit.Test;

public class ConcurrentSetMapTest {
    public static class ConcurrentSetMap<K, V> {
        private final ConcurrentMap<K, Set<V>> map = new ConcurrentHashMap<K, Set<V>>();

        public void add(K key, V value) {
            Set<V> set = map.get(key);
            if (set != null) {
                set.add(value);
            } else {
                Set<V> candidateSet = createConcurrentSet(value);
                set = map.putIfAbsent(key, candidateSet);
                if (set != null) {
                    // candidate set not accepted, use existing
                    set.add(value);
                }
            }
        }

        public void remove(K key, V value) {
            Set<V> set = map.get(key);
            if (set != null) {
                boolean removed = set.remove(value);
                if (removed && set.isEmpty()) {
                    // this is not thread-safe and causes the test to fail
                    map.remove(key, set);
                }
            }
        }

        public boolean contains(K key, V value) {
            Set<V> set = map.get(key);
            if (set == null) {
                return false;
            }
            return set.contains(value);
        }

        protected Set<V> createConcurrentSet(V element) {
            Set<V> set = Collections.newSetFromMap(new ConcurrentHashMap<V, Boolean>());
            set.add(element);
            return set;
        }
    }

    @Test
    public void testThreadSafe() throws InterruptedException, ExecutionException {
        ConcurrentSetMap<String, String> setMap = new ConcurrentSetMap<String, String>();
        ExecutorService executors = Executors.newFixedThreadPool(4);
        List<Future<?>> futures = new ArrayList<Future<?>>();

        futures.add(executors.submit(new TestWorker(setMap, "key1")));
        futures.add(executors.submit(new TestWorker(setMap, "key1")));
        futures.add(executors.submit(new TestWorker(setMap, "key2")));
        futures.add(executors.submit(new TestWorker(setMap, "key2")));

        for (Future<?> future : futures) {
            future.get();
        }
    }

    public static class TestWorker implements Runnable {
        ConcurrentSetMap<String, String> setMap;
        String key;

        public TestWorker(ConcurrentSetMap<String, String> setMap, String key) {
            super();
            this.setMap = setMap;
            this.key = key;
        }

        public void run() {
            int sampleSize = 100000;
            for (int i = 0; i < sampleSize; ++ i) {
                // avoid value clashes with other threads
                String value = Thread.currentThread().getName() + i;

                Assert.assertFalse("Should not exist before add", setMap.contains(key, value));
                setMap.add(key, value);
                Assert.assertTrue("Should exist after add", setMap.contains(key, value));
                setMap.remove(key, value);
                Assert.assertFalse("Should not exist after remove", setMap.contains(key, value));
            }
        }
    }
}
lance-java
  • 25,497
  • 4
  • 59
  • 101
  • 1
    Several of your operations need to be atomic to prevent thread interleaving. You can't do what you want without some synchronization and locking in your code. – assylias Aug 30 '12 at 11:28
  • "I'd like to solve this without needing to synchronize the whole thing" You will need to lock the whole object for add,remove and contains to thread safe. – Yoztastic Aug 30 '12 at 11:34

5 Answers5

4

Don't write such a map, use somebody else's. I would use one of Guava's SetMultimap implementations such as HashMultimap and synchronize it using Multimaps.synchronizedSetMultimap.

nd.
  • 8,699
  • 2
  • 32
  • 42
  • I agree, its quite optimistic to try to obtain the same performance in such a generic concurrent data structure than something developed by google. – jolivier Aug 30 '12 at 11:48
  • I work on Guava -- doing this without synchronizing the whole thing is going to be an _extraordinarily_ difficult problem, though we've looked into it in the past. I would expect the research involved to take _months._ – Louis Wasserman Aug 30 '12 at 13:37
  • Whilst I'm a big fan of Guava, I'd prefer to keep my application as dependency free as possible. I'm writing an open-source library and I'd rather not force my users to have a Guava dependency. – lance-java Aug 30 '12 at 14:12
  • @uklance If Guava's Apache 2.0 license is compatible with yours, you could just simply take one of their SetMultimap implementations and integrate it into your software. SO has a chart of license compatibility if you need one http://stackoverflow.com/a/1978524/112964 – nd. Sep 03 '12 at 17:11
  • Given that there are now two solutions to the problem that don't require synchronized read access, I won't be considering Multimaps.synchronizedSetMultimap. Thanks for the suggestion though – lance-java Sep 04 '12 at 09:00
2

Here is a fully concurrent and thread-safe implementation:

public class ConcurrentSetMap<K,V> {

  private final ConcurrentMap<K, Set<V>> _map = new ConcurrentHashMap<K, Set<V>>();

  public void add(K key, V value) {
    Set<V> curSet = _map.get(key);
    while(true) {

      if((curSet != null) && curSet.contains(value)) {
        break;
      }

      Set<V> newSet = new HashSet<V>();
      newSet.add(value);

      if(curSet == null) {

        curSet = _map.putIfAbsent(key, newSet);
        if(curSet != null) {
          continue;
        }

      } else {

        newSet.addAll(curSet);
        if(!_map.replace(key, curSet, newSet)) {
          curSet = _map.get(key);
          continue;
        }
      }

      break;
    }
  }

  public void remove(K key, V value) {
    Set<V> curSet = _map.get(key);

    while(true) {
      if((curSet == null) || !curSet.contains(value))  {
        break;
      }

      if(curSet.size() == 1) {

        if(!_map.remove(key, curSet)) {
          curSet = _map.get(key);
          continue;
        }

      } else {

        Set<V> newSet = new HashSet<V>();
        newSet.addAll(curSet);
        newSet.remove(value);
        if(!_map.replace(key, curSet, newSet)) {
          curSet = _map.get(key);
          continue;
        }
      }

      break;
    }
  }

  public boolean contains(K key, V value) {
    Set<V> set = _map.get(key);
    return set != null && set.contains(value);
  }
}

Comparing timing to @PeterLawrey's answer (on my box), his takes 2.9 seconds, this takes 1.4 seconds.

jtahlborn
  • 52,909
  • 5
  • 76
  • 118
  • I will need to study your solution in order to fully understand it and verify that it's thread-safe. Let the head-scratching begin ;) – lance-java Aug 31 '12 at 10:07
  • @uklance - verifying that it is thread-safe is fairly trivial. it uses ConcurrentHashMap and never modifies a Set which is available to other threads. verifying that it is correctly _atomic_ on the other hand is definitely a bit more challenging. :) – jtahlborn Aug 31 '12 at 11:58
  • 1
    @uklance - the basic pattern is this: copy the current value set into a new set and modify that, then use the atomic ops of CHM to replace the old set with the new set. if the map update fails, someone else replaced the value set before me, so start over again from the top. – jtahlborn Aug 31 '12 at 12:06
  • I suspected that's what you were doing. This code looks very similar to Doug Lea's code (ConcurrentHashMap) which is why I am quite interested in your solution. – lance-java Aug 31 '12 at 12:57
  • @jtahlborn I'm wondering if you could achieve the same using `AtomicReference>` instead? Seems like a simpler option. I *think* those existed in 2012 when you posted this answer :) – kaqqao Sep 25 '18 at 11:11
  • @kaqqao yes, AtomicRef is part of the same utils as ConcurrentMap. but, i have no idea what you would use the AtomicRef to do in this case? – jtahlborn Sep 25 '18 at 20:43
  • @jtahlborn I was thinking of using `AtomicReference>.getAndUpdate` when adding an element, instead of retrying in a loop, but I've now realized it could lead to lost updates if stuff happens between getting the current set and calling `getAndUpdate`. I _think_. – kaqqao Sep 26 '18 at 13:02
  • @kaqqao yes, and you would not really ever be able to remove a reference from the map because you wouldn't know if someone was in the midst of modifying that reference. it's very tricky logic. – jtahlborn Sep 26 '18 at 14:49
2

I've managed to solve my problem :)

I failed to mention in my initial post that I require fast reads from the collection and I'm not too concerned about write speed. For this reason, I have come up with a solution that synchronizes write access but does not require synchronized read access. The code below now passes my test case.

Thanks to all for your suggestions.

public static class ConcurrentSetMap<K, V> {
    private final ConcurrentMap<K, Set<V>> map = new ConcurrentHashMap<K, Set<V>>();

    public synchronized void add(K key, V value) {
        Set<V> set = map.get(key);
        if (set != null) {
            set.add(value);
        } else {
            map.put(key, createConcurrentSet(value));
        }
    }

    public synchronized void remove(K key, V value) {
        Set<V> set = map.get(key);
        if (set != null) {
            set.remove(value);
            if (set.isEmpty()) {
                map.remove(key);
            }
        }
    }

    public boolean contains(K key, V value) {
        return get(key).contains(value);
    }

    public Set<V> get(K key) {
        Set<V> set = map.get(key);
        return set == null ? Collections.<V> emptySet() : set;
    }

    protected Set<V> createConcurrentSet(V value) {
        Set<V> set = Collections.newSetFromMap(new ConcurrentHashMap<V, Boolean>());
        set.add(value);
        return set;
    }
} 
lance-java
  • 25,497
  • 4
  • 59
  • 101
1

You need to use locking as you are performing multiple operations which need to be atomic collectively.

public class SynchronousMultiMap<K, V> {
    private final Map<K, Set<V>> map = new LinkedHashMap<K, Set<V>>();

    public synchronized void add(K key, V value) {
        Set<V> set = map.get(key);
        if (set == null)
            map.put(key, set = new LinkedHashSet<V>());
        set.add(value);
    }

    public synchronized void remove(K key, V value) {
        Set<V> set = map.get(key);
        if (set == null) return;
        set.remove(value);
        if (set.isEmpty()) map.remove(key);
    }

    public synchronized boolean contains(K key, V value) {
        Set<V> set = map.get(key);
        return set != null && set.contains(value);
    }

    @Test
    public void testThreadSafe() throws ExecutionException, InterruptedException {
        ExecutorService executors = Executors.newFixedThreadPool(3);
        List<Future<?>> futures = new ArrayList<Future<?>>();
        SynchronousMultiMap<String, Integer> setMap = new SynchronousMultiMap<String, Integer>();
        int sampleSize = 1000000;

        String[] keys = "key1,key2,key3,key4".split(",");
        for (int i = 0; i < 3; i++)
            futures.add(executors.submit(new TestWorker(setMap, keys, sampleSize, i)));

        executors.shutdown();
        for (Future<?> future : futures) {
            future.get();
        }
    }

    static class TestWorker implements Runnable {
        final SynchronousMultiMap<String, Integer> setMap;
        final String[] keys;
        final int sampleSize;
        final int value;

        public TestWorker(SynchronousMultiMap<String, Integer> setMap, String[] keys, int sampleSize, int value) {
            super();
            this.setMap = setMap;
            this.keys = keys;
            this.sampleSize = sampleSize;
            this.value = value;
        }

        public void run() {
            for (int i = 0; i < sampleSize; i += keys.length) {
                for (String key : keys) {
                    boolean contains = setMap.contains(key, value);
                    if (contains)
                        Assert.assertFalse("Should not exist before add", contains);
                    setMap.add(key, value);
                    boolean contains2 = setMap.contains(key, value);
                    if (!contains2)
                        Assert.assertTrue("Should exist after add", contains2);
                    setMap.remove(key, value);
                    boolean contains3 = setMap.contains(key, value);
                    if (contains3)
                        Assert.assertFalse("Should not exist after remove", contains3);
                }
            }
        }
    }
}

takes 0.35 seconds to run. With a sampleSize=1000000 it takes < 8 seconds.

Peter Lawrey
  • 525,659
  • 79
  • 751
  • 1,130
  • > To test it I would suggest generating a few long running tasks instead of thousands of small ones - I disagree, my test case was testing two threads in contention for the same key. Yours is not (3 workers, 3 threads) – lance-java Aug 30 '12 at 13:16
  • Good point. Have fixed that. Now all the threads use the same keys, but different values. – Peter Lawrey Aug 30 '12 at 13:23
  • I can't help but think that something could be done with java.util.concurrent.locks to avoid locking the whole collection at once, but this should work. Either way, I'd prefer to use something off the shelf if possible. – Alex Aug 30 '12 at 13:39
  • @Alex Using ReentrantReadWriteLock to allow concurrent reads increased the time form 7.3 seconds to 20.3 seconds. :) – Peter Lawrey Aug 30 '12 at 13:47
  • `synchronized` is usually best when you have code which should be single threaded. If I remove the locking and make it single threaded it takes 1.6 seconds. If I don't remove empty sets it drops to 0.94 seconds. ;) – Peter Lawrey Aug 30 '12 at 13:49
  • If I use `TIntArrayList` instead of `Set` it drops to 0.39 seconds. – Peter Lawrey Aug 30 '12 at 13:51
  • Yeah, I specifically had RWLock in mind, but that's better suited to a usage pattern where reads far outnumber writes. I wouldn't necessarily expect it to shine in a test where writes outnumber reads 2:1 :) – Alex Aug 30 '12 at 14:05
  • Ehh, they're about equal, but still not well suited for RWLock – Alex Aug 30 '12 at 14:07
  • `synchronized` is faster because it has biased locking which effectively turns multi-threaded code into single thread for short periods of time. As the single thread code is fastest, this really makes a difference. – Peter Lawrey Aug 30 '12 at 14:14
  • let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/16044/discussion-between-alex-and-peter-lawrey) – Alex Aug 30 '12 at 14:37
1

Use ConcurrentMap of ConcurrentSkipListSet.

Call ConcurrentMap.putIfAbsent() to add a set for a key, if it doesn't exist.

Call ConcurrentMap.remove( key, emptySet ), where emptySet is an empty ConcurrentSkipListSet.

If the current value, corresponding to key, is empty, it'll be removed.

user3666197
  • 1
  • 6
  • 50
  • 92
Erwin M
  • 11
  • 1
  • 1
    The problem with this approach is adding an item atomically. E,g, you call `putIfAbsent` and it adds an empty set, then a different thread removes it, then you try to add an item and boom. – kaqqao Sep 26 '18 at 19:40