16

I have code which implements a "lock handler" for arbitrary keys. Given a key, it ensures that only one thread at a time can process that(or equals) key (which here means calling the externalSystem.process(key) call).

So far, I have code like this:

public class MyHandler {
    private final SomeWorkExecutor someWorkExecutor;
    private final ConcurrentHashMap<Key, Lock> lockMap = new ConcurrentHashMap<>();

    public void handle(Key key) {
        // This can lead to OOM as it creates locks without removing them
        Lock keyLock = lockMap.computeIfAbsent( 
            key, (k) -> new ReentrantLock()
        );
        keyLock.lock();
        try {
            someWorkExecutor.process(key);
        } finally {
            keyLock.unlock();
        }
    }
}

I understand that this code can lead to the OutOfMemoryError because no one clear map.

I think about how to make map which will accumulate limited count of elements. When limit will be exceeded then we should replace oldest access element with new(this code should synchronized with oldest element as monitor). But I don't know how to have callback which will say me that limit exceeded.

Please share your thoughts.

P.S.

I reread the task and now I see that I have limitation that handle method cannot be invoked more than 8 threads. I don't know how can it help me but I just mentioned it.

P.S.2

by @Boris the Spider was suggested nice and simple solution:

} finally {
      lockMap.remove(key);
      keyLock.unlock();
}

But after Boris noticed that code us not thread safe because it break behavior:
lets research 3 threads invoked with equally key:

  1. Thread#1 acquire the lock and now before map.remove(key);
  2. Thread#2 invokes with equals key so it wait when thread#1 release lock.
  3. then thread#1 execute map.remove(key);. After this thread#3 invokes method handle. It checks that lock for this key is absent in map thus it creates new lock and acquires it.
  4. Thread#1 releases the lock and thus thread#2 acquires it.
    Thus thread#2 and thread#3 can be invoked in parallel for equals keys. But it should not be allowed.

To avoid this situation, before map clearing we should block any thread to acquire the lock while all threads from waitset is not acquire and release the lock. Looks like it is enough complicated synchronization needed and it will lead to slow algorithm working. Maybe we should clear map from time to time when map size exceeds some limited value.

I wasted a lot of time but unfortunately I have not ideas how to achieve this.

gstackoverflow
  • 36,709
  • 117
  • 359
  • 710
  • Boris the Spider like this ? **} finally { lockMap.remove(keyLock); keyLock.unlock(); }** – gstackoverflow Jan 27 '17 at 16:20
  • @Boris the Spider but it prevent OOM error – gstackoverflow Jan 27 '17 at 16:24
  • Hmm, on second thoughts, that won't work. Scenario: `1` comes along, creates a `Lock` and locks it. `2` comes along and find the lock, waits. `1` finishes, unlocks and removes. `3` comes along and finds no `Lock` hence `2` and `3` will have concurrent access. Apologies for that brain fart. – Boris the Spider Jan 27 '17 at 16:24
  • @Boris the Spider Hmm...you are right but it looked nice – gstackoverflow Jan 27 '17 at 16:30
  • @Boris it is really nice case. Now I really don't understand how can I delete value from map and be sure that noone want to acqure this one – gstackoverflow Jan 27 '17 at 16:37
  • @Boris what do you think about having of map which will store acquired locks and we need atomic operation which will check if key doesn't exist in acquiredMap then we can remove it from lockMap – gstackoverflow Jan 27 '17 at 16:45
  • I think something like that might work. Maybe store an `Locks` object in the `Map` which contains the `Lock` and an `AtomicInteger` of acquired locks. So the operation would be 1) increment locks 2) acquire lock 3) do work 4) decrement locks 5) if `0`, delete lock 6) unlock. There might still be a race hazard here however, I think, as 1) happens outside of the `Lock` - by necessity. Needs some more thought. – Boris the Spider Jan 27 '17 at 17:08
  • Consider instead using lock striping, e.g. Guava's `Striped`. A fixed array of locks (e.g. 1024) would probably be good enough and avoid retaining keys. Worse case a weak stripe is more flexible, but adds overhead with little practical benefit. – Ben Manes Jan 27 '17 at 18:40
  • google has some nice concurrent caching maps. Take a look at `com.google.common.cache.CacheBuilder` – Martin Serrano Jan 27 '17 at 19:38
  • Yes, unless this is homework, you should use a cache library. [Guava Cache](https://github.com/google/guava/wiki/CachesExplained) is a really lightweight one, but does the hard parts like eviction correctly. – Mick Mnemonic Jan 27 '17 at 19:56
  • @Ben Manes can you provide full answer? – gstackoverflow Jan 28 '17 at 08:42
  • @Mick Mnemonic I store locks in cache. As Borise noticed we can have a problems if we will delete value from cache and this value will wait lock release – gstackoverflow Jan 28 '17 at 09:23
  • Your requirements seem quite unclear at the moment and scattered around as comments. Could you revise the description to include everything relevant? If your locks never time out, you can't prevent running out of memory eventually, right? Or are you asking for a notification solution that would release the lock when the cache times out? – Mick Mnemonic Jan 28 '17 at 13:45
  • @Mick Mnemonic thanks for your comment. I didn't think about this but I know that this code can execute only 8 threads thus I believe that we will not get OOME. Now I am adding more information to the topic. – gstackoverflow Jan 28 '17 at 15:52
  • @Mick Mnemonic, please read my topic update. – gstackoverflow Jan 28 '17 at 16:10
  • @Martin Serrano task is more complicated than you understood. please read topic update. – gstackoverflow Jan 28 '17 at 16:18
  • The key to using a CHM for this task is not to try to have a separate "insert" then "lock" operations, but to treat the insert _as the lock operation_. Same on the unlock/remove side. This gets rid of the race that is fundamental in solutions like @BoristheSpider. I have used this idiom successfully to implement lock managers and fleshed out an answer below. – BeeOnRope Jan 28 '17 at 17:31
  • @gstackoverflow - how long does a typical `process` call take, and is it CPU-bound or does it, for example, do IO or make network calls? – BeeOnRope Jan 29 '17 at 15:20
  • @BeeOnRope I really don't know – gstackoverflow Jan 30 '17 at 13:32
  • @gstackoverflow - it's pretty important to understand that to help drive the design. Do you at least know _what type_ of work is done by `process`? Does it make network calls, for example? – BeeOnRope Jan 30 '17 at 18:51
  • @BeeOnRope, for me actually it is black box – gstackoverflow Jan 30 '17 at 21:20
  • @BeeOnRope 1.Do you think this correct http://stackoverflow.com/a/41930368/2674303 ? 2.Do you agree with my comment about http://stackoverflow.com/a/41943234/2674303 ? – gstackoverflow Jan 30 '17 at 21:22
  • 1. Yes, looks fine. 2. Yes, it was racy in the same way as other earlier solutions. It looks like the code is modified now, however. – BeeOnRope Jan 30 '17 at 21:44

10 Answers10

8

You don't need to try to limit the size to some arbitrary value - as it turns out, you can accomplish this kind of "lock handler" idiom while only storing exactly the number of keys currently locked in the map.

The idea is to use a simple convention: successfully adding the mapping to the map counts as the "lock" operation, and removing it counts as the "unlock" operation. This neatly avoids the issue of removing a mapping while some thread still has it locked and other race conditions.

At this point, the value in the mapping is only used to block other threads who arrive with the same key and need to wait until the mapping is removed.

Here's an example1 with CountDownLatch rather than Lock as the map value:

public void handle(Key key) throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);

    // try to acquire the lock by inserting our latch as a
    // mapping for key        
    while(true) {
        CountDownLatch existing = lockMap.putIfAbsent(key, latch);
        if (existing != null) {
            // there is an existing key, wait on it
            existing.await();
        } else {
            break;
        }
    }

    try {
        externalSystem.process(key);
    } finally {
        lockMap.remove(key);
        latch.countDown();
    }
}

Here, the lifetime of the mapping is only as long as the lock is held. The map will never have more entries than there are concurrent requests for different keys.

The difference with your approach is that the mappings are not "re-used" - each handle call will create a new latch and mapping. Since you are already doing expensive atomic operations, this isn't likely to be much of a slowdown in practice. Another downside is that with many waiting threads, all are woken when the latch counts down, but only one will succeed in putting a new mapping in and hence acquiring the lock - the rest go back to sleep on the new lock.

You could build another version of this which re-uses the mappings when threads coming along and wait on an existing mapping. Basically, the unlocking thread just does a "handoff" to one of the waiting threads. Only one mapping will be used for an entire set of threads that wait on the same key - it is handed off to each one in sequence. The size is still bounded because one no more threads are waiting on a given mapping it is still removed.

To implement that, you replace the CountDownLatch with a map value that can count the number of waiting threads. When a thread does the unlock, it first checks to see if any threads are waiting, and if so wakes one to do the handoff. If no threads are waiting, it "destroys" the object (i.e., sets a flag that the object is no longer in the mapping) and removes it from the map.

You need to do the above manipulations under a proper lock, and there are a few tricky details. In practice I find the short and sweet example above works great.


1 Written on the fly, not compiled and not tested, but the idea works.

BeeOnRope
  • 60,350
  • 16
  • 207
  • 386
  • It declares two latch references, `latch` and `existing`, and initializes the `latch` reference to be a new CDL with value 1. The `existing` reference is assigned later in the loop. Anyway, I simplified that line to make it clearer now. – BeeOnRope Jan 28 '17 at 17:44
  • existing initially is not initialized and thus I believe that code will hang on line **latch.await();** – gstackoverflow Jan 28 '17 at 17:46
  • I am not sure what happens if **latch.countDown();** invokes before **latch.await();** – gstackoverflow Jan 28 '17 at 17:52
  • @gstackoverflow - if `countDown()` occurs before `await()` then `await()` just returns immediately. That's a major use-case of latches - they are often counted down once after something is initialized, and so then all later `await()` calls just return immediately. – BeeOnRope Jan 28 '17 at 17:57
  • @gstackoverflow - I'm not sure what you mean by "existing initially is not initialized" - it is initialized immediately on declaration. It is impossible in Java to use an uninitialized variable anyway: the compiler detects it and complains. – BeeOnRope Jan 28 '17 at 17:58
  • Sure, it was correct before too. In java it is fine to do: `Object o; do { o = foo(); ... } while (o != ...); ` The compiler is able to use control flow analysis to understand that `o` is always initialized before it is used. – BeeOnRope Jan 28 '17 at 18:02
  • what whould be if you have 2+ threads which wait **latch.await()** to the moment when you do **latch.countDown();** – gstackoverflow Jan 28 '17 at 19:15
  • _All_ threads waiting on the latch are released (worken up) when you count down a latch and it hits zero. Only one thread will succeed in adding it's latch to the map and hence acquire the lock, however. The other thread(s) will wait on the new latch. @gstack – BeeOnRope Jan 28 '17 at 19:19
  • lets imagine situation that thread#1 added new latch and before **lockMap.remove(key);** by thread#1, thread#2 and thread#3 got existing latch from map thus execute **latch.await();** – gstackoverflow Jan 28 '17 at 19:43
  • @gstackoverflow right... then those threads (2 and 3) will wait on the latch (it has a count of `1`). Later, `thread-1` removes the mapping, and does `countDown` on the latch, releasing both `thread-2` and `thread-3`, at which point both will do the `computeIfAbsent` call. Only one will succeed - let's say `thread-2` - and then `thread-3` will call `await` on the latch newly inserted by `thread-2`. – BeeOnRope Jan 28 '17 at 19:49
  • computIfAbsent occures before **latch.await();** – gstackoverflow Jan 28 '17 at 20:04
  • The `await` and `compute` calls are in a loop. So the insert will happen again. – BeeOnRope Jan 28 '17 at 20:17
  • according this code I think it is possible to rewrite my initial code using tryLock in loop – gstackoverflow Jan 28 '17 at 20:46
  • Probably, but you need _blocking_ not spinning or else you'll suffer terrible performance. The code above uses the latch to put the thread to sleep until it can get the lock. If you just repeatedly call '`tryLock` you won't give up the CPU and other threads will make progress only slowly. – BeeOnRope Jan 28 '17 at 21:02
  • @NicolasFilotto - yes, `putIfAbsent` is definitely simpler and more appropriate here: changed. I fixed the missing brace as well. – BeeOnRope Feb 06 '17 at 19:57
3

You could rely on the method compute(K key, BiFunction<? super K,? super V,? extends V> remappingFunction) to synchronize calls to your method process for a given key, you don't even need anymore to use Lock as type of the values of your map as you don't rely on it anymore.

The idea is to rely on the internal locking mechanism of your ConcurrentHashMap to execute your method, this will allow threads to execute in parallel the process method for keys whose corresponding hashes are not part of the same bin. This equivalent to the approach based on striped locks except that you don't need additional third party library.

The striped locks' approach is interesting because it is very light in term of memory footprint as you only need a limited amount of locks to do it, so the memory footprint needed for your locks is known and never changes, which is not the case of approaches that use one lock for each key (like in your question) such that it is generally better/recommended to use approaches based on striped locks for such need.

So your code could be something like this:

// This will create a ConcurrentHashMap with an initial table size of 16   
// bins by default, you may provide an initialCapacity and loadFactor
// if too much or not enough to get the expected table size in order
// increase or reduce the concurrency level of your map
// NB: We don't care much of the type of the value so I arbitrarily
// used Void but it could be any type like simply Object
private final ConcurrentMap<Key, Void> lockMap = new ConcurrentHashMap<>();

public void handle(Key lockKey) {
    // Execute the method process through the remapping Function
    lockMap.compute(
        lockKey,
        (key, value) -> {
            // Execute the process method under the protection of the
            // lock of the bin of hashes corresponding to the key
            someWorkExecutor.process(key);
            // Returns null to keep the Map empty
            return null;
        }
    );
}

NB 1: As we always returns null the map will always be empty such that you will never run out of memory because of this map.

NB 2: As we never affect a value to a given key, please note that it could also be done using the method computeIfAbsent(K key, Function<? super K,? extends V> mappingFunction):

public void handle(Key lockKey) {
    // Execute the method process through the remapping Function
    lockMap.computeIfAbsent(
        lockKey,
        key -> {
            // Execute the process method under the protection of the
            // lock of the segment of hashes corresponding to the key
            someWorkExecutor.process(key);
            // Returns null to keep the Map empty
            return null;
        }
    );
}

NB 3: Make sure that your method process never calls the method handle for any keys as you would end up with infinite loops (same key) or deadlocks (other non ordered keys, for example: If one thread calls handle(key1) and then process internally calls handle(key2) and another thread calls in parallel handle(key2) and then process internally calls handle(key1), you will get a deadlock whatever the approach used). This behavior is not specific to this approach, it will occur with any approaches.

Nicolas Filotto
  • 43,537
  • 11
  • 94
  • 122
  • 1
    between actions **if (!hasQueuedThreads()) {** and **lockMap.remove(key, this);** made by thread_1 new value can be added to the map and acquire the lock by thread_2. Thus thread_1 deletes the lock from map. after this action thread_3 will add new vlue to the map and new lock will be added. Thus thread_2 and thread_3 will work in parallel – gstackoverflow Jan 30 '17 at 21:17
  • That's an interesting approach - using the machinery of the CHM to do your locking without ever inserting anything into the map :). You are hostage to the internal implementation of the map, however, which wasn't really designed to call arbitrary functions like `process` from inside the compute function. The doc says: "Some attempted update operations on this map by other threads may be blocked while computation is in progress, so the computation should be short and simple, and must not attempt to update any other mappings of this map." – BeeOnRope Jan 30 '17 at 21:50
  • For example, what happens if `process` is slow, or worse, if it ends up calling back into the lock manager and trying to get another lock? I guess it deadlocks in that case... – BeeOnRope Jan 30 '17 at 21:50
  • @BeeOnRope not it is not a deadlock, you will get the same behavior with striped locks, in other words the other threads will wait until the segment is released by the thread calling the remapping Function. You cannot have deadlocks except if your method process calls handle for another key, but you will get deadlocks with striped locks for the same reasons – Nicolas Filotto Jan 30 '17 at 21:55
  • Yes, I meant that it calls back in with another key. Yes, striped locks have a similar (?) problem (I commented also on that answer). Most of the other solutions here don't have that issue, however. Just something to note if you go this route. It isn't clear when/if it will deadlock, it just tells you _not_ to modify the map, so you'd have to ensure you don't do it, which restricts the `process` call somewhat. – BeeOnRope Jan 30 '17 at 22:06
  • @BeeOnRope if `process` calls `handle` with any keys and with **any approaches** (not only this one) you can get deadlocks or infinite loops cf NB #3 – Nicolas Filotto Jan 30 '17 at 22:10
  • False. "More locks" doesn't equal "more deadlocks" - that's no justification at all! It depends on the locking graph. You must have **no cycles** in your locking graph (i.e., the dependency graph of keys), and then most solutions won't deadlock. Your solution will probably trivially deadlock when even even a single thread tries to call back into the lock manager which will try to modify the mapping _inside_ the compute call. Just be clear that in **this** solution you cannot call back into `handle` in any scenario from `process` as it's forbidden by the API contract of `compute`. – BeeOnRope Jan 30 '17 at 22:19
  • I didn't understand your last comment, but of course I'll add details to explain the pitfalls of various solutions, including my own. That's how StackOverflow works. A key behavior of a general lock manager is that it doesn't introduce deadlocks that aren't already there in the underlying code (i.e., as long as the locking graph is some kind of DAG). Solutions which share locks (striping) violate this, and solutions that do their callout inside `process` are even more at risk, since they may violate the contract of `CHM.compute()`. – BeeOnRope Jan 30 '17 at 22:38
  • Of course, _if_ the callout never calls back into the lock manager, this isn't an issue at all - but the OP doesn't know because it is a "black box", so that kind of implies you can make very few assumptions about what it does. – BeeOnRope Jan 30 '17 at 22:39
  • @NicolasFilotto A concern with this approach is that concurrency depends on the internal sizing of the hash table to determine the number of locks (table "bins"). Because the table starts at 0 and is never populated, this degrades to a single lock. You would need to provide an initial capacity to try to coerce the number of locks, which makes the code a bit obscure. – Ben Manes Jan 30 '17 at 23:08
  • @BeeOnRope what you quote and see as a contract violation is nothing more than a common recommendation about critical sections: they must be short and simple to prevent blocking too long the threads waiting to execute them and you need to avoid locking something else inside as it may cause deadlocks if the lock acquisition is not ordered. If one thread calls `handle(key1)` and then process internally calls `handle(key2)` and another thread calls in parallel `handle(key2)` and then process internally calls `handle(key1)`, you will get a deadlock whatever the approach used even with yours – Nicolas Filotto Jan 31 '17 at 08:06
  • @Nicolas Filottosome about **computeIfAbsent**: after first insertion to the map we will have entry [key, null] thus **WorkExecutor.process(key);** won''t be called in case if this key exists in map. – gstackoverflow Jan 31 '17 at 10:54
  • @gstackoverflow no we won't, `WorkExecutor.process(key)` will be called at each call, if the mapping function returns `null` nothing is added to the map check the javadoc I quote: "*If the specified key is not already associated with a value, attempts to compute its value using the given mapping function and enters it into this map* **unless** *`null`*". Feel free to check by yourself if you are not convinced. – Nicolas Filotto Jan 31 '17 at 13:30
  • 2
    @Ben Manes: the default capacity is specified to be `16`, so you only need to specify an explicit capacity, if that is not sufficient. Of course, even the biggest capacity doesn’t preclude hash collisions entirely. – Holger Feb 01 '17 at 12:27
  • 2
    @gstackoverflow: As Nicolas Filotto already stated, returning `null` implies no storage, which is crucial as `ConcurrentHashMap` doesn’t support storing `null` at all. But the general contract of `Map.computeIfAbsent` is to treat a stored mapping to `null` (if supported) as *absent* value anyway. – Holger Feb 01 '17 at 12:41
  • 1
    @Holger You're right. For some reason I thought the lazy initialization of the table resulted in the first insertion to result in a single lock before resizing. – Ben Manes Feb 01 '17 at 17:57
  • @NicolasFilotto - we will have to disagree then. The Javadoc is pretty explicit: don't modify the map from code called while _inside_ the `computeIfAbsent` function. It's not a gentle admonition to keep your critical section short, it's telling you the implementation may break, deadlock etc if you do it. In fact, that's exactly [what it does](http://stackoverflow.com/q/28840047/149138). So it is no idle warning. – BeeOnRope Feb 06 '17 at 20:02
  • @BeeOnRope No need to go into an infinite and useless debate, there is a clear Nb in my answer that indicates to avoid calling process recursively as you could end up with deadlocks whatever the approach used (even yours) – Nicolas Filotto Feb 06 '17 at 21:48
2

One approach is to dispense with the concurrent hash map entirely, and just use a regular HashMap with locking to perform the required manipulation of the map and lock state atomically.

At first glance, this seems to reduce the concurrency of the system, but if we assume that the process(key) call is lengthy relative the very fast lock manipulations, it works well because the process() calls still run concurrently. Only a small and fixed amount of work occurs in the exclusive critical section.

Here's a sketch:

public class MyHandler {

    private static class LockHolder {
        ReentrantLock lock = new ReentrantLock();
        int refcount = 0;
        void lock(){
            lock.lock();
        }
    } 

    private final SomeWorkExecutor someWorkExecutor;
    private final Lock mapLock = new ReentrantLock();
    private final HashMap<Key, LockHolder> lockMap = new HashMap<>();

    public void handle(Key key) {

        // lock the map
        mapLock.lock();
        LockHolder holder = lockMap.computeIfAbsent(key, k -> new LockHolder());
        // the lock in holder is either unlocked (newly created by us), or an existing lock, let's increment refcount
        holder.refcount++;
        mapLock.unlock();

        holder.lock();

        try {
            someWorkExecutor.process(key);
        } finally {
            mapLock.lock()
            keyLock.unlock();
            if (--holder.refcount == 0) {
              // no more users, remove lock holder
              map.remove(key);
            }
            mapLock.unlock();
        }
    }
}

We use refcount, which is only manipulated under the shared mapLock to keep track of how many users of the lock there are. Whenever the refcount is zero, we can get rid of the entry as we exit the handler. This approach is nice in that it is fairly easy to reason about and will perform well if the process() call is relatively expensive compared to the locking overhead. Since the map manipulation occurs under a shared lock, it is also straightforward to add additional logic, e.g., keeping some Holder objects in the map, keeping track of statistics, etc.

gstackoverflow
  • 36,709
  • 117
  • 359
  • 710
BeeOnRope
  • 60,350
  • 16
  • 207
  • 386
  • No, it cannot, because all `refcount` manipulation happens under `mapLock` (response to deleted comment...). – BeeOnRope Jan 28 '17 at 18:21
  • ok, it looks nice. but lock release should be in finally section – gstackoverflow Jan 28 '17 at 18:22
  • Perhaps, if you believe exceptions can be thrown by any code after the `lock` (doesn't seem like it to me). In any case, it's up to you to make the code production ready. This is a _sketch_, offered to you free of charge :) – BeeOnRope Jan 28 '17 at 18:23
  • I need time to check your ideas, but I really want to thank you – gstackoverflow Jan 28 '17 at 18:25
  • This solution works correctly but has terrible performance. Each lock operation needs two compare and swap operation, one of it is contended. With the refcount and the map lock, you are doing actually the same as the lock does by itself. BTW: With a caching solution you also prevent new lock objects being created on busy keys. – cruftex Jan 29 '17 at 08:44
  • Of course in many important scenarios it will have totally fine performance (indistinguishable from the optimal solution where you somehow roll-your-own lock handler from the locking and synchronization primivites): when the locked region (i.e., the `process` call) is long. Often such a lock handler is used to ensure, for e.g., that you don't make more than one callout to the same third party remote host at the same time. Then the lock is going to be many orders of magnitude faster than the work done in the lock, so you just want the simplest solution. @cruftex – BeeOnRope Jan 30 '17 at 17:20
  • I think I clear about it too: "if we assume that the process(key) call is lengthy relative the very fast lock manipulations". Personally I see a lot of value in at least stating from a _simple, correct, and easy to reason about_ (this may not be it!) solution, and going from there if some part is inadequate. One of the flaws of the proliferation of newer, more concurrent abstractions is that somehow "plain old locking" is assumed to be universally terrible, while in practice is totally fine outside of hotspots. – BeeOnRope Jan 30 '17 at 17:24
0

Thanks Ben Mane
I have found this variant.

public class MyHandler {
    private final int THREAD_COUNT = 8;
    private final int K = 100;
    private final Striped<Lock> striped = Striped.lazyWeakLock(THREAD_COUNT * K);
    private final SomeWorkExecutor someWorkExecutor = new SomeWorkExecutor();

    public void handle(Key key) throws InterruptedException {
        Lock keyLock = striped.get(key);

        keyLock.lock();
        try {
            someWorkExecutor.process(key);
        } finally {
            keyLock.unlock();
        }       
    }
}
gstackoverflow
  • 36,709
  • 117
  • 359
  • 710
  • Keep in mind this doesn't offer the same guarantees of concurrency as your original code. In particular, any two _different_ keys can collide on the same lock, and one will block the other, even though they are for different keys. If 8 threads try to process concurrently, one or more collisions is virtually guaranteed (to have 8 different keys hash perfectly to the 8 stripes is highly unlikely). I added an answer below which preserves the full concurrency of your original solution, without the infinitely growing map. – BeeOnRope Jan 28 '17 at 17:28
  • @BeeOnRope thanks for your comment. a bit corrected the code. – gstackoverflow Jan 28 '17 at 17:41
  • I don't see `K` used anywhere? – BeeOnRope Jan 28 '17 at 17:46
  • The use of `Striped` is relatively better when the size of the elements is small relative to the lock, and there are many of them and/or the operation performed _in_ the lock (in your case, the `process()` call) is relatively small. For example, to implement the internals of a concurrent hash map. The upside of `Striped` is that you have a _fixed number_ of locks, created at construction time and living forever. So for something like a CHM with 1 million elements, you would still need only 16 locks or whatever. – BeeOnRope Jan 28 '17 at 18:29
  • ... so I don't think it's a great fit for this problem - there the number of elements is _proportional_ to the number of threads: so you would naturally want to scale the number of locks in the same way. You don't want 800 locks if only a few threads are calling `process()` at once, and conversely, if you do have many threads coming in you want more locks. That's why I prefer here the solution of having exactly one live lock per thread in `process` (and shared among all waiting threads on the same `key`), which scales nicely and never results in a collision. – BeeOnRope Jan 28 '17 at 18:31
  • That mostly works _if_ the call to `process` is doing a reasonable amount of work (say one millisecond or more). If it's less than that, the `Striped` approach may win since it avoids the overhead of map manipulation, lock creation, etc. – BeeOnRope Jan 28 '17 at 18:32
  • @BeeOnRope A fixed `Striped` has the benefit of eager allocation to avoid GC overhead. In practice, at a decent size collisions tend to not be a significant bottleneck as there are fewer cores than threads (but its true, they aren't perfect). A dynamic `Striped` uses weak references to clean-up, which allows per-key locks. My recommendation was based on being pragmatic and keeping the code simple. – Ben Manes Jan 28 '17 at 22:14
  • Indeed but eager allocations are both an upside and a downside - in the code above 800 locks are allocated up front regardless of the contention level. – BeeOnRope Jan 28 '17 at 22:36
  • @BeeOnRope Since the code is using `lazyWeakLock` the allocation is on demand. If he used `lock(n)` then it would have been allocated up font. – Ben Manes Jan 28 '17 at 23:27
  • @ben - interesting - do they also go away when unused? Otherwise the lazy allocation doesn't help much (perhaps for short running applications) because eventually they keys will spread across all the stripes. If they go away, it's nicer. – BeeOnRope Jan 29 '17 at 01:18
  • @BeeOnRope yes. A weak reference is eligible for garbage collection if there are no strong references remaining. This allows cleanup, but also reused if the same key is frequently locked on. – Ben Manes Jan 29 '17 at 01:21
  • Cool (I mean I know how weak references work, but not how `Striped` is implemented). I think the choice then comes down mostly to the type of work performed inside the locked region (IOW the `process` method in this example). If that work is long, and especially if it is not CPU bound, I still prefer a method that assigns each task a unique lock, since even a few collisions may be unacceptable (e.g., you really don't want one task to block an unrelated one). @BenManes – BeeOnRope Jan 29 '17 at 04:20
  • OTOH, if the works is short, or if it is purely CPU bound, `Striped` may be a great choice. CPU bound work is especially appealing because here the "extra" concurrency offered by exactly one lock per `key` isn't necessarily useful and in fact may be harmful, since you'd often rather just have ~NCPU jobs running away while blocking other jobs from starting. `Striped` doesn't exactly do this, however, but it at least lends credence to the idea that you don't need max parallelism. – BeeOnRope Jan 29 '17 at 04:23
  • One other note: in some cases using a `Striped` lock may not even be _correct_: if the `process` call ends up calling back, on another thread, with a different key, a deadlock may result if `key` collides with the lock already held on the original thread. This isn't theoretical - I've run into exactly this problem. For example, any time you have a DAGs and are processing the nodes in topological order (imagine Java classloading for e.g.) across threads, you rely on the the "A" properly of DAGs to avoid deadlocks. Collisions effectively break the acyclic nature. This is fairly obscure though :) – BeeOnRope Jan 29 '17 at 04:26
  • @BeeOnRope By each task having its own lock, you mean by key? A large Striped will do that by using a weak valued map, or you can ensure that with similar code. While I had taught its author striping and gave him the code that he rewrote for Guava, I don't typically use striping anymore. Usually I find cleaner approaches or need custom code to dynamically resize based on contention. But in cases where it's too vague to answer deeper, its a good pragmatic choice to start from. There might be better alternatives, but I'd also be afraid of over complicating it. – Ben Manes Jan 29 '17 at 05:18
  • For DAGs jumping across threads, you might consider replacing a lock with a semaphore. You're right that recursive computations cause dead/live-locks. I prefer disallowing if possible, e.g. I did that in Guava's cache and helped fix it in ConcurrentHashMap. A workaround that is often acceptable is to build the DAG out of futures to avoid recursion. – Ben Manes Jan 29 '17 at 05:25
  • @Ben Manes what do you think about solution through Collections.syncronized over WeakHashMap ? – gstackoverflow Jan 29 '17 at 09:35
  • @BeeOnRope I want to know your opinion about previous comment too. – gstackoverflow Jan 29 '17 at 09:36
  • @BenManes - yes, I mean each task has its own lock "by key". – BeeOnRope Jan 29 '17 at 15:13
  • @gstackoverflow - sure you can build a solution with `WeakHashMap` - basically you use your original "never remove keys" code and then rely on the weak cleanup to remove the mappings if the key has disappeared. It's quite similar to the `Interner` based solution below, although `Interner` saves you some boilerplate _if_ you are able to synchronize directly on the object, and `Interner` is likely to be somewhat better under concurrency (since it is based on a concurrent map). You could also consider `CacheBuilder` with `weakKeys()`. – BeeOnRope Jan 29 '17 at 15:17
  • @gstackoverflow Its a good approach if, for some reason, you can't depend on `Guava`. We're all arriving at the same set of solutions and its coming down to your API preferences. – Ben Manes Jan 29 '17 at 16:48
0

Here's a short and sweet version that leverages the weak version of Guava's Interner class to do the heavily lifting of coming up with a "canonical" object for each key to use as the lock, and implementing weak reference semantics so that unused entries are cleaned up.

public class InternerHandler {
    private final Interner = Interners.newWeakInterner();

    public void handle(Key key) throws InterruptedException {
        Key canonKey = Interner.intern(key);
        synchronized (canonKey) {
            someWorkExecutor.process(key);
        }       
    }
}

Basically we ask for a canonical canonKey which is equal() to key, and then lock on this canonKey. Everyone will agree on the canonical key and hence all callers that pass equal keys will agree on the object on which to lock.

The weak nature of the Interner means that any time the canonical key isn't being used, the entry can be removed, so you avoid accumulation of entries in the interner. Later, if an equal key again comes in, a new canonical entry is chosen.

The simple code above relies on the built-in monitor to synchronize - but if this doesn't work for you (e.g., it's already used for another purpose) you can include a lock object in the Key class or create a holder object.

gstackoverflow
  • 36,709
  • 117
  • 359
  • 710
BeeOnRope
  • 60,350
  • 16
  • 207
  • 386
  • Note that some keys you shouldn't synchronize on, e.g. Strings. Using a surrogate lock object leads us back to Guava's weak Striped lock that does this almost identically. – Ben Manes Jan 29 '17 at 05:35
  • @BenManes - indeed, I mentioned above that you may or may not be able to use the built-in lock. In the limit of an infinitely large `Striped` lock, I think they look similar, yes. – BeeOnRope Jan 29 '17 at 15:10
  • @Ben Manes, please explain why **some keys you shouldn't synchronize on, e.g. Strings** – gstackoverflow Jan 30 '17 at 09:44
  • @gstackoverflow Some types are interned and instances shared globally, e.g. -127 to 128 Integers. Then different synchronize calls might clash causing coarser locking. Ideally you'd use weak values, not keys, to avoid a race caused by collection of an equal key but different instances resulting in a new lock being used concurrently. – Ben Manes Jan 30 '17 at 09:57
0
class MyHandler {
    private final Map<Key, Lock> lockMap = Collections.synchronizedMap(new WeakHashMap<>());
    private final SomeWorkExecutor someWorkExecutor = new SomeWorkExecutor();

    public void handle(Key key) throws InterruptedException {
        Lock keyLock = lockMap.computeIfAbsent(key, (k) -> new ReentrantLock()); 
        keyLock.lock();
        try {
            someWorkExecutor.process(key);
        } finally {
            keyLock.unlock();
        }
    }
}
gstackoverflow
  • 36,709
  • 117
  • 359
  • 710
0

Creating and removing the lock object for a key each time is an costly operation in term of performance. When you do add/remove lock from concurrent map (say cache), it have to be ensure that putting/removing object from cache is itself thread-safe. So this seems not good idea but can be implemented via ConcurrentHashMap

Strip locking approach (also used by concurrent hash map internally) is better approach. From Google Guava docs it is explained as

When you want to associate a lock with an object, the key guarantee you need is that if key1.equals(key2), then the lock associated with key1 is the same as the lock associated with key2.

The crudest way to do this is to associate every key with the same lock, which results in the coarsest synchronization possible. On the other hand, you can associate every distinct key with a different lock, but this requires linear memory consumption and concurrency management for the system of locks itself, as new keys are discovered.

Striped allows the programmer to select a number of locks, which are distributed between keys based on their hash code. This allows the programmer to dynamically select a tradeoff between concurrency and memory consumption, while retaining the key invariant that if key1.equals(key2), then striped.get(key1) == striped.get(key2)

code:

//declare globally; e.g. class field level
Striped<Lock> rwLockStripes = Striped.lock(16);

    Lock lock = rwLockStripes.get("key");
    lock.lock();
    try {
        // do you work here
    } finally {
        lock.unlock();
    }

Following snipped of code can help in implementing the putting/removal of lock.

private ConcurrentHashMap<String, ReentrantLock> caches = new ConcurrentHashMap<>();

public void processWithLock(String key) {
    ReentrantLock lock = findAndGetLock(key);
    lock.lock();
    try {
        // do you work here

    } finally {
        unlockAndClear(key, lock);
    }
}

private void unlockAndClear(String key, ReentrantLock lock) {
    // *** Step 1: Release the lock.
    lock.unlock();
    // *** Step 2: Attempt to remove the lock
    // This is done by calling compute method, if given lock is present in
    // cache. if current lock object in cache is same instance as 'lock'
    // then remove it from cache. If not, some other thread is succeeded in
    // putting new lock object and hence we can leave the removal of lock object to that
    // thread.
    caches.computeIfPresent(key, (k, current) -> lock == current ? null : current);

}

private ReentrantLock findAndGetLock(String key) {
    // Merge method given us the access to the previously( if available) and
    // newer lock object together.
    return caches.merge(key, new ReentrantLock(), (older, newer) -> nonNull(older) ? older : newer);
}
skadya
  • 4,330
  • 19
  • 27
  • I didn't catch your answer because you changed the names of initial topic. Why do you use 10 in Striped argument. Why do you use ReadWriteLock ? – gstackoverflow Feb 05 '17 at 09:25
  • I just put example for illustration only. I have just edited code example to make it clear. While creating Stripped instance, we need to specify the number to specify the number of eagerly initialized actual Lock instances (in other words, this number limits the max number of keys which can be locked at same time). Ideally this number should in power of 2 (for fast mode) (e.g. 2, 4, 8, 16..). If we give 10, internally 16 (next available number which is power of 2) is used. Hopes this helps. – skadya Feb 05 '17 at 17:00
0

Instead of writing you own you might try something like JKeyLockManager. From the projects description:

JKeyLockManager provides fine-grained locking with application specific keys.

Example code given on site:

public class WeatherServiceProxy {
  private final KeyLockManager lockManager = KeyLockManagers.newManager();

  public void updateWeatherData(String cityName, float temperature) {
    lockManager.executeLocked(cityName, () -> delegate.updateWeatherData(cityName, temperature)); 
  }
beat
  • 1,857
  • 1
  • 22
  • 36
-2

New values will be added when you call

lockMap.computeIfAbsent()

So you can just check lockMap.size() for item count.

But How are you going to find first added item? it would be better just remove items after you used them.

IL Mare
  • 13
  • 5
  • 1
    As [noted above](https://stackoverflow.com/questions/41898355/concurrenthasmap-cache-which-cannot-throw-outofmemoryerror?noredirect=1#comment70979375_41898355), removing won't work and would lead to concurrent access. – Boris the Spider Jan 27 '17 at 16:27
  • Then I would suggest to add key and lock into new object and store it in ordered collection. – IL Mare Jan 27 '17 at 16:34
  • 1
    Removing an old `Lock` won't work unless you can guarantee that the `Lock` is not in use. – Boris the Spider Jan 27 '17 at 16:36
  • Then delete like this )) `if(keyLock.tryLock()) { lockMap.remove(key); keyLock.unlock(); }` – IL Mare Jan 27 '17 at 16:44
  • The question was "But I don't know how to have callback which will say me that limit exceeded". So written entirely answer the question. Initial question doesn't contain anything about delete logic. – IL Mare Jan 27 '17 at 20:39
  • @IL Mare, it is obviously that code shouldn't change behaviour – gstackoverflow Jan 30 '17 at 21:24
  • @gstackoverflow And my answer(that for having callback he can just check Map size and launch callback if count more than allowed) changed behaviour unacceptably? – IL Mare Jan 31 '17 at 12:58
  • @IL Mare, sure. because deleted lock can be in use – gstackoverflow Jan 31 '17 at 12:59
  • @gstackoverflow right! And finding when lock will be free is not part of "callback wich says that limit exceeded". No? – IL Mare Feb 02 '17 at 13:30
-2

You can use an in process cache that stores object references, like Caffeine, Guava, EHCache or cache2k. Here is an example how to build a cache with cache2k:

final Cache<Key, Lock> locks =
  new Cache2kBuilder<Key, Lock>(){}
    .loader(
      new CacheLoader<Key, Lock>() {
        @Override
        public Lock load(Key o) {
          return new ReentrantLock();
        }
      }
    )
    .storeByReference(true)
    .entryCapacity(1000)
    .build();

The usage pattern is as you have in the question:

    Lock keyLock = locks.get(key);
    keyLock.lock();
    try {
        externalSystem.process(key);
    } finally {
        keyLock.unlock();
    }

Since the cache is limited to 1000 entries, there is an automatically cleanup of locks that are not in use any more.

There is the potential that a lock in use is evicted by the cache, if the capacity and the number of threads in the application are mismatching. This solution works perfectly for years in our applications. The cache will evict a lock that is in use, when there is a sufficiently long running task AND the capacity is exceeded. In a real application you always control the number of life threads, e.g. in a web container you would limit the number of processing threads to (example) 100. So you know that there are never more then 100 locks in use. If this is accounted for, this solution has a minimum overhead.

Keep in mind that the locking only works as long as your application runs on a single VM. You may want to take a look at distributed lock managers (DLM). Examples for products that provide distributed locks: hazelcast, infinispan, teracotta, redis/redisson.

cruftex
  • 5,545
  • 2
  • 20
  • 36
  • Hmm, on second thoughts, that won't work. Scenario: 1 comes along, creates a Lock and locks it. 2 comes along and find the lock, waits. 1 finishes, unlocks and removes. 3 comes along and finds no Lock hence 2 and 3 will have concurrent access. Apologies for that brain fart.(c) Boris the Spider – gstackoverflow Jan 28 '17 at 10:58
  • I believe that this scenario possible – gstackoverflow Jan 28 '17 at 10:58
  • Exactly - won't work. The cache either evicts by age or last access. If a thread checks out a lock it does that once and doesn't access the `Cache` after that. So the `Cache` could, with sufficiently long running tasks, evict a key that's "in use". – Boris the Spider Jan 28 '17 at 13:00
  • @gstackoverflow It would work in `Caffeine` or `Guava` if you use `weakValues()` instead of size or expiration. That, though, is what `Striped.lazyWeakLock(n)` is built on. – Ben Manes Jan 28 '17 at 22:19
  • This solution works perfectly for years in our applications. The cache will evict a lock that is in use, when there is a sufficiently long running task AND the capacity is exceeded. In a real application you always control the number of life threads, e.g. in a web container you would limit the number of processing threads to (example) 100. So you know that there are never more then 100 locks in use. – cruftex Jan 29 '17 at 07:34
  • Updated my comment. The concerns of eviction are very relevant. This solution only works if sized appropriately. When done properly this is the solution with the least overhead. – cruftex Jan 29 '17 at 07:39