0

There is class Counter, which contains a set of keys and allows incrementing value of each key and getting all values. So, the task I'm trying to solve is the same as in Atomically incrementing counters stored in ConcurrentHashMap . The difference is that the set of keys is unbounded, so new keys are added frequently.

In order to reduce memory consumption, I clear values after they are read, this happens in Counter.getAndClear(). Keys are also removed, and this seems to break things up.

One thread increments random keys and another thread gets snapshots of all values and clears them.

The code is below:

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.Map;
import java.util.HashMap;
import java.lang.Thread;

class HashMapTest {
    private final static int hashMapInitSize = 170;
    private final static int maxKeys = 100;
    private final static int nIterations = 10_000_000;
    private final static int sleepMs = 100;

    private static class Counter {
        private ConcurrentMap<String, Long> map;

        public Counter() {
            map = new ConcurrentHashMap<String, Long>(hashMapInitSize);
        }

        public void increment(String key) {
            Long value;
            do {
                value = map.computeIfAbsent(key, k -> 0L);
            } while (!map.replace(key, value, value + 1L));
        }

        public Map<String, Long> getAndClear() {
            Map<String, Long> mapCopy = new HashMap<String, Long>();
            for (String key : map.keySet()) {
                Long removedValue = map.remove(key);
                if (removedValue != null)
                    mapCopy.put(key, removedValue);
            }
            return mapCopy;
        }
    }

    // The code below is used for testing
    public static void main(String[] args) throws InterruptedException {
        Counter counter = new Counter();
        Thread thread = new Thread(new Runnable() {
            public void run() {
                for (int j = 0; j < nIterations; j++) {
                    int index = ThreadLocalRandom.current().nextInt(maxKeys);
                    counter.increment(Integer.toString(index));
                }
            }
        }, "incrementThread");
        Thread readerThread = new Thread(new Runnable() {
            public void run() {
                long sum = 0;
                boolean isDone = false;
                while (!isDone) {
                    try {
                        Thread.sleep(sleepMs);
                    }
                    catch (InterruptedException e) {
                        isDone = true;
                    }
                    Map<String, Long> map = counter.getAndClear();
                    for (Map.Entry<String, Long> entry : map.entrySet()) {
                        Long value = entry.getValue();
                        sum += value;
                    }
                    System.out.println("mapSize: " + map.size());
                }
                System.out.println("sum: " + sum);
                System.out.println("expected: " + nIterations);
            }
        }, "readerThread");
        thread.start();
        readerThread.start();
        thread.join();
        readerThread.interrupt();
        readerThread.join();
        // Ensure that counter is empty
        System.out.println("elements left in map: " + counter.getAndClear().size());
    }
}

While testing I have noticed that some increments are lost. I get the following results:

sum: 9993354
expected: 10000000
elements left in map: 0

If you can't reproduce this error (that sum is less than expected), you can try to increase maxKeys a few orders of magnitude or decrease hashMapInitSize or increase nIterations (the latter also increases run time). I have also included testing code (main method) in the case it has any errors.

I suspect that the error is happening when capacity of ConcurrentHashMap is increased during runtime. On my computer the code appears to work correctly when hashMapInitSize is 170, but fails when hashMapInitSize is 171. I believe that size of 171 triggers increasing of capacity (128 / 0.75 == 170.66, where 0.75 is the default load factor of hash map).

So, the question is: am I using remove, replace and computeIfAbsent operations correctly? I assume that they are atomic operations on ConcurrentHashMap based on answers to Use of ConcurrentHashMap eliminates data-visibility troubles?. If so, why are some increments lost?

EDIT:

I think that I missed an important detail here that increment() is supposed to be called much more frequently than getAndClear(), so that I try to avoid any explicit locking in increment(). However, I'm going to test performance of different versions later to see if it is really an issue.

Community
  • 1
  • 1
fdermishin
  • 3,519
  • 3
  • 24
  • 45
  • Is there a reason to not use [`AtomicLong`s](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/AtomicLong.html)? – Chai T. Rex Dec 19 '16 at 19:44
  • @ChaiT.Rex Actually I started by using AtomicLong, but it turned up to be much more complicated, because both ConcurrentHashMap and AtomicLong have atomic operations, but they are not atomic when combined (consider `map.computeIfAbsent(key, k -> new AtomicLong(0L)).incrementAndGet()`). This could have been solved with explicit synchronization, but it reduces throughput a lot. – fdermishin Dec 19 '16 at 19:59
  • Your `increment` method should really just be `map.merge(key, 1L, Long::sum)`. I'd be really dubious that you could make a thread-safe `getAndClear()` no matter what you did, though. – Louis Wasserman Dec 19 '16 at 20:47
  • @LouisWasserman Thanks for suggesting merge! I didn't know about this method. Regarding `getAndClear()`, why do you think it isn't thread-safe? It is just a sequence of `remove` operations. If each of them are atomic, then sum of the values they return should be preserved, isn't it? The snapshot needs not to represent the state in a certain moment of time, I want only the sum to be preserved. – fdermishin Dec 19 '16 at 21:20
  • Only single operations are thread safe, but generally the combination of multiple thread safe operations is not itself thread safe. What happens if some values are getting added while others are getting removed? – Louis Wasserman Dec 19 '16 at 21:22
  • Then we get value of one key in one moment of time and value of another key in another moment of time, it's ok for me. What else can go wrong? I just want to ensure that for each key the value in `map` plus the value in `mapCopy` always equals the number of times the key is incremented. – fdermishin Dec 19 '16 at 21:27

1 Answers1

1

I gues the problem is the use of remove while iterating over the keySet. This is what the JavaDoc says for Map#keySet() (my emphasis):

Returns a Set view of the keys contained in this map. The set is backed by the map, so changes to the map are reflected in the set, and vice-versa. If the map is modified while an iteration over the set is in progress (except through the iterator's own remove operation), the results of the iteration are undefined.

The JavaDoc for ConcurrentHashMap give further clues:

Similarly, Iterators, Spliterators and Enumerations return elements reflecting the state of the hash table at some point at or since the creation of the iterator/enumeration.

The conclusion is that mutating the map while iterating over the keys is not predicatble.

One solution is to create a new map for the getAndClear() operation and just return the old map. The switch has to be protected, and in the example below I used a ReentrantReadWriteLock:

class HashMapTest {
private final static int hashMapInitSize = 170;
private final static int maxKeys = 100;
private final static int nIterations = 10_000_000;
private final static int sleepMs = 100;

private static class Counter {
    private ConcurrentMap<String, Long> map;
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    ReadLock readLock = lock.readLock();
    WriteLock writeLock = lock.writeLock();

    public Counter() {
        map = new ConcurrentHashMap<>(hashMapInitSize);
    }

    public void increment(String key) {
        readLock.lock();
        try {
            map.merge(key, 1L, Long::sum);
        } finally {
            readLock.unlock();
        }
    }

    public Map<String, Long> getAndClear() {
        ConcurrentMap<String, Long> oldMap;
        writeLock.lock();
        try {
            oldMap = map;
            map = new ConcurrentHashMap<>(hashMapInitSize);
        } finally {
            writeLock.unlock();
        }

        return oldMap;
    }
}

// The code below is used for testing
public static void main(String[] args) throws InterruptedException {
    final AtomicBoolean ready = new AtomicBoolean(false);

    Counter counter = new Counter();
    Thread thread = new Thread(new Runnable() {
        public void run() {
            for (int j = 0; j < nIterations; j++) {
                int index = ThreadLocalRandom.current().nextInt(maxKeys);
                counter.increment(Integer.toString(index));
            }
        }
    }, "incrementThread");

    Thread readerThread = new Thread(new Runnable() {
        public void run() {
            long sum = 0;
            while (!ready.get()) {
                try {
                    Thread.sleep(sleepMs);
                } catch (InterruptedException e) {
                    //
                }
                Map<String, Long> map = counter.getAndClear();
                for (Map.Entry<String, Long> entry : map.entrySet()) {
                    Long value = entry.getValue();
                    sum += value;
                }
                System.out.println("mapSize: " + map.size());
            }
            System.out.println("sum: " + sum);
            System.out.println("expected: " + nIterations);
        }
    }, "readerThread");
    thread.start();
    readerThread.start();
    thread.join();
    ready.set(true);
    readerThread.join();
    // Ensure that counter is empty
    System.out.println("elements left in map: " + counter.getAndClear().size());
}
}
forty-two
  • 12,204
  • 2
  • 26
  • 36
  • Thanks for pointing out the issue! I read about removing elements from ConcurrentHashMap while iterating it, and two methods mixed up in my head: `Iterator.remove` and `Map.remove`, leaving me with wrong impression that it is safe to call `Map.remove` while iterating. However, it is only safe to call `Iterator.remove`. – fdermishin Dec 21 '16 at 14:19
  • I tried to to copy the set of keys before removing elements, and brief testing shows that it could work. – fdermishin Dec 21 '16 at 14:21