0

I have an issue where I have multiple threads that are continuously writing to, for example a Concurrent HashMap. Now, I want to periodically process (via TimerJob) everything that is in that hashmap up to that point. Other threads can still keep writing to it (this new data will be processed when Timejob kicks off next time).

I was wondering what would be the best way to accomplish this. I was reading and this problem seems a lot like Triple Buffer. I am not super positive though about that.

Any thoughts?

Edit: I want to remove the data from the map after processing it that way I don't end up re-processing that data

Edit: I don't need to necessarily write the data to HashMap/Set. I just need to put it in a collection where I could process the collection periodically while other threads are still writing to it.

test123
  • 13,865
  • 9
  • 28
  • 33
  • 3
    Well the normal way to do this is to get the `keySet` and iterate over that. Does that work for you? Why not? https://docs.oracle.com/javase/10/docs/api/java/util/concurrent/ConcurrentHashMap.html#keySet() – markspace Oct 22 '18 at 15:03
  • @test123 : do you have any key reuse in your map (e.g. is it possible that a given key is re-written at some point) and what is the desired output at this point ? – GPI Oct 22 '18 at 15:39
  • If das this is usually a use case for a queue, which the timer job could process until its empty? This would solve the ordering problem you get with a map. – daniu Oct 22 '18 at 16:01
  • @GPI, I don't necessarily need to put it in a HashMap. I can use any Collection. I just want to make sure that multiple threads could write to it at the same time and I could process it periodically. – test123 Oct 22 '18 at 16:31
  • 2
    Is there a need for Hash based storage (which means ignoring or replacing equal additions), as opposed to a queue? With hash based storage, a question arises as to the intended behavior when a value is added while an equal value is being purged? (This question arises for *both* entire collection replacement and with single-element-at-a-time purging.) If a queue can be used instead of a hash map, there are existing types which provide most of the necessary function (see https://stackoverflow.com/questions/1301691/java-queue-implementations-which-one), plus probably more in last 9 years.) – Thomas Bitonti Oct 22 '18 at 16:49
  • @ThomasBitonti that's a great point. I think in that case, I am perfectly fine with a queue. However, I still need to basically say that `get everything in the queue until this point` for processing while data from different threads could still be written to it. – test123 Oct 22 '18 at 16:57
  • If you know what data you are storing than you can create somehow some special object and add it to the queue at the time you start processing. You are processing the queue until you get that special object from the queue(compare for object reference equality with ==) – HPCS Oct 22 '18 at 17:22
  • 1
    @test123 With a queue, the purge function can always take the first element of storage. Writers would always add to storage. *LinkedBlockingQueue* provides *peek*, *poll*, and *offer* as the base API. For more of a batch operation, *drainTo* can be used. – Thomas Bitonti Oct 22 '18 at 18:24
  • @ThomasBitonti, not sure if you care, but if you were to post your suggestion, I will accept that as a solution. :) – test123 Oct 30 '18 at 17:05

2 Answers2

1

I am not sure if you need all the data in the map, or the data processed by the timer job you don't need anymore in the map.

If you only need somehting like snapshot for the timer job, you can switch/replace map with the new one like this.

private volatile ConcurentHashMap map ;

public void processByTimerJob(){

   ConcurentHashMap oldMap = this.map;
   this.map = new ConcurrentHashMap; // everyting new will be stored in new map    
   oldMap.forEach(.....   //process old map via iteration or whatever you want

}
HPCS
  • 1,434
  • 13
  • 22
  • This is an interesting idea and something I hand't considered. Worth a think, at least. – markspace Oct 22 '18 at 15:17
  • 2
    It would be difficult to prove that no writing is occurring during the forEach. – OldCurmudgeon Oct 22 '18 at 15:18
  • What do you mean difficult to prove that no writing is occurring during the foreach? If you access the map for put/remove via instance variable **map**, than you can be sure that after changing volatile reference that nobody will access the old map via the **map** reference. – HPCS Oct 22 '18 at 15:21
  • Depending on how other methods access `map`, this may not be Thread-safe (well, Thread-safe, yes, because `ConcurrentHashMap` guarantees this, but still, writes could occur after swapping out the map object). This would require all accesses to the `map` reference to be synchronized and, as such, invalidate most of the advantages of the `ConcurrentHashMap` implementation. – Florian Albrecht Oct 22 '18 at 15:21
  • 1
    For explanation: Another method could do: `Map localMap = this.map; doSomeLengthyChecksWhileMapIsSwapped(); localMap.put(a, b);` Volatile wouldn't help against this. – Florian Albrecht Oct 22 '18 at 15:22
  • 2
    Or some thread could still be executing a `putAll` during your `forEach`. – OldCurmudgeon Oct 22 '18 at 15:24
  • @florianAlbrech this you have to take care, if you use this patttern. – HPCS Oct 22 '18 at 15:26
  • @OldCurmudgeon when he iterating via iterator he can remove all processed entries from the map and after he stop the iteration, he can ask if the map is empty or not. Or he can store the oldMap as leftover when the job finished, and ask in new timer job whether the map is empty. – HPCS Oct 22 '18 at 15:30
  • 1
    Agreed with the skeptics. You'd have to prove no writing operation is occuring once the swap has occured, which defeats the whole purpose. You could wait for the old map to disappear in a ReferenceQueue and process it from there, but that is not really determinitic. I'd go the "simple" route : `keySet.iterator()` and a `removeOnTheFly` should be safe unless you have key reuse. – GPI Oct 22 '18 at 15:37
  • @GPI I don't necessarily need to store it in a ConcurrentHashMap, ConcurrentSet works just fine as well. Also, I think there will be only 2 methods in the code: `public write(Object somedata);` `private timerJobToProcess();` So ***write*** method is called from different parts of the code from multiple threads. ***timerJobToProcess*** will be triggered by scheduled timejob. – test123 Oct 22 '18 at 16:50
0

I would use double buffering and a Read/Write lock.

Double buffering reduces holdups by allowing processing of the swapped-out map.

Using a Read/Write lock allows me to be certain that no-one is still writing to the map after we swap.

class DoubleBufferedMap<K, V> extends AbstractMap<K, V> implements Map<K, V> {
    // Used whenever I want to create a new map.
    private final Supplier<Map<K, V>> mapSupplier;
    // The underlying map.
    private volatile Map<K, V> map;
    // My lock - for ensuring no-one is writing.
    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    final Lock readLock = readWriteLock.readLock();
    final Lock writeLock = readWriteLock.writeLock();

    public DoubleBufferedMap(Supplier<Map<K, V>> mapSupplier) {
        this.mapSupplier = mapSupplier;
        this.map = mapSupplier.get();
    }

    /**
     * Swaps out the current map with a new one.
     * 
     * @return the old map ready for processing, guaranteed to have no pending writes.
     */
    public Map<K,V> swap() {
        // Grab the existing map.
        Map<K,V> oldMap = map;
        // Replace it.
        map = mapSupplier.get();
        // Take a write lock to wait for all `put`s to complete.
        try {
            writeLock.lock();
        } finally {
            writeLock.unlock();
        }
        return oldMap;
    }

    // Put methods must take a read lock (yeah I know it's weird)

    @Nullable
    @Override
    public V put(K key, V value) {
        try{
            // Take a read-lock so they know when I'm finished.
            readLock.lock();
            return map.put(key, value);
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public void putAll(@NotNull Map<? extends K, ? extends V> m) {
        try{
            // Take a read-lock so they know when I'm finished.
            readLock.lock();
            map.putAll(m);
        } finally {
            readLock.unlock();
        }
    }

    @Nullable
    @Override
    public V putIfAbsent(K key, V value) {
        try{
            // Take a read-lock so they know when I'm finished.
            readLock.lock();
            return map.putIfAbsent(key, value);
        } finally {
            readLock.unlock();
        }
    }


    // All other methods are simply delegated - but you may wish to disallow some of them (like `entrySet` which would expose the map to modification without locks).

    @Override
    public Set<Entry<K, V>> entrySet() {
        return map.entrySet();
    }

    @Override
    public boolean equals(Object o) {
        return map.equals(o);
    }

    @Override
    public int hashCode() {
        return map.hashCode();
    }

    // ... The rest of the delegators (left to the student)
OldCurmudgeon
  • 64,482
  • 16
  • 119
  • 213