12

There are writer which updates prices by calling putPrice method. Reader is using getPrice to get a latest price. hasChangedMethod returns a boolean identifying if price has been changed since last time getPrice has been invoked.

I am looking for fastest solution. I am trying to achieve thread-safe consistent read/writes into the map on key level.

I think that locking the whole map may cause a performance issue thats why I decided to make it on the key level. Unfortunately it doesn't work as expected and blocks the whole map. Why? Could you please help me to figure out what I am doing wrong here?

UPDATE:

I guess we can summarise in two questions: 1. how do I provide free access to rest of the keys if one is in the update process. 2. How do I guarantee atomic operations of my methods since they require multiple operations read/write. eg getPrice() - get price and update hasChanged flag.

PriceHolder.java

public final class PriceHolder {

    private ConcurrentMap<String, Price> prices;

    public PriceHolder() {
        this.prices = new ConcurrentHashMap<>();

        //Receive starting prices..
        Price EUR = new Price();
        EUR.setHasChangedSinceLastRead(true);
        EUR.setPrice(new BigDecimal(0));

        Price USD = new Price();
        USD.setHasChangedSinceLastRead(true);
        USD.setPrice(new BigDecimal(0));
        this.prices.put("EUR", EUR);
        this.prices.put("USD", USD);

    }

    /** Called when a price ‘p’ is received for an entity ‘e’ */
    public void putPrice(
            String e,
            BigDecimal p) throws InterruptedException {

            synchronized (prices.get(e)) {
                Price currentPrice = prices.get(e);
                if (currentPrice != null && !currentPrice.getPrice().equals(p)) {
                    currentPrice.setHasChangedSinceLastRead(true);
                    currentPrice.setPrice(p);
                } else {
                    Price newPrice = new Price();
                    newPrice.setHasChangedSinceLastRead(true);
                    newPrice.setPrice(p);
                    prices.put(e, newPrice);
                }
            }
    }

    /** Called to get the latest price for entity ‘e’ */
    public BigDecimal getPrice(String e) {
        Price currentPrice = prices.get(e);
        if(currentPrice != null){
            synchronized (prices.get(e)){
                currentPrice.setHasChangedSinceLastRead(false);
                prices.put(e, currentPrice);
            }
            return currentPrice.getPrice();
        }
        return null;
    }

    /**
     * Called to determine if the price for entity ‘e’ has
     * changed since the last call to getPrice(e).
     */
    public boolean hasPriceChanged(String e) {
        synchronized (prices.get(e)){
            return prices.get(e) != null ? prices.get(e).isHasChangedSinceLastRead() : false;
        }
    }
}

Price.java

public class Price {

    private BigDecimal price;

    public boolean isHasChangedSinceLastRead() {
        return hasChangedSinceLastRead;
    }

    public void setHasChangedSinceLastRead(boolean hasChangedSinceLastRead) {
        this.hasChangedSinceLastRead = hasChangedSinceLastRead;
    }

    public BigDecimal getPrice() {
        return price;
    }

    public void setPrice(BigDecimal price) {
        this.price = price;
    }

    private boolean hasChangedSinceLastRead = false;

}
Wild Goat
  • 3,509
  • 12
  • 46
  • 87
  • Do you know in advance what the keys (I mean the currency names such as EUR and USD) will be in design-time or the writer can also put some new currencies during run-time? – izce Nov 03 '15 at 15:10
  • @IzCe, yes I know all the keys in advance. Basically number of currencies in the world. Maybe I am a bit confused, then how would I lock it on key level? – Wild Goat Nov 03 '15 at 15:16
  • 1
    Since you know the keys in advance, you don't need to use ConcurrentMap. Just a simple Map would be sufficient. While initializing your program, the main thread can populate the Map with those keys and Price objects with defaults. All you need is to get the Price object and synchronize over it for each get and put operation. You don't need to use `Thread.sleep(3000);` call in synchronized block as there is no point to wait the thread in such a critical path. – izce Nov 03 '15 at 15:26
  • http://stackoverflow.com/questions/25998536/does-a-concurrenthashmap-need-wrapped-in-a-synchronized-block I don't think you need to use synchronized blocks since ConcurrentHashMap already ensures that multiple threads can write and read. The locks are implemented at the hash bucket level instead of on the whole table. – Shashank Nov 03 '15 at 15:26
  • And you don't need this line `prices.put(e, currentPrice);` in `getPrice(...)`method as you've already a reference to the same `Price` object in `currentPrice`variable. – izce Nov 03 '15 at 15:35
  • @Wild Goat: Shall the writer thread update multiple currencies at once? Similarly, shall the reader thread will read multiple currencies at once? – izce Nov 03 '15 at 16:23
  • @IzCe, yes, writer can update multiple currencies in parallel and reader can read multiple currencies at the same time. How the design would change if I don't know the key in advance? – Wild Goat Nov 03 '15 at 16:26
  • When you say "blocks the whole map", what exactly do you mean? It appears you are holding the lock for the price object for three seconds at a time, which could prevent reading if there are frequent writes. Could that be causing the problem you are seeing? – Warren Dew Nov 03 '15 at 16:32
  • @Shashank, but how about the logic in `getPrice()` where I update a `hasChangedSilnceLastRead` property? For example if thread1 read priceA, thread2 modified priceA and then I override last again by thread1 - inconsistency? – Wild Goat Nov 03 '15 at 16:34
  • @WarrenDew, Sorry, thread.sleep() is just a dummy code i was using for testing. ignore it. I want to block a read/write on key level, seems that ConcurrentMap can offer me that benefit but how do I deal with login in `getPrice()` where I do few operations: read and update, can that cause an inconsistency if another thread will write in between these operations? – Wild Goat Nov 03 '15 at 16:37
  • `synchronized (prices.get(e)) {` - What if the map is initially empty, i.e. `get()` returns `null`? – JimmyB Nov 03 '15 at 17:06
  • @HannoBinder, yes you are right, that will throw a null pointer, again if I do a check before I am loosing atomicy? – Wild Goat Nov 03 '15 at 17:08
  • Your question is not clear. I don't see why the whole map would be blocked. However, I would suggest that you have a look at http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReadWriteLock.html. It might do what you want: allow readers to progress while potentially slow writers are contending for the lock. You still have to keep the critical section for writing small though. – Alexander Torstling Nov 03 '15 at 17:16
  • `I think that locking the whole map may cause a performance issue`, thinking is fine but not enough in cases like this. You need to obtain data as well. – biziclop Nov 03 '15 at 17:17
  • 6
    Another word of advice: don't read multiple times from the map and expect to get the same result every time if you are writing multithreaded code. – Alexander Torstling Nov 03 '15 at 17:24

7 Answers7

3

The use of a ConcurrentMap heavily depends on the Java version. When you are using Java 8 or newer, you get almost everything for free:

public final class PriceHolder {

    private ConcurrentMap<String, Price> prices;

    public PriceHolder() {
        this.prices = new ConcurrentHashMap<>();

        //Receive starting prices..
        Price EUR = new Price();
        EUR.setHasChangedSinceLastRead(true);
        EUR.setPrice(BigDecimal.ZERO);

        Price USD = new Price();
        USD.setHasChangedSinceLastRead(true);
        USD.setPrice(BigDecimal.ZERO);
        this.prices.put("EUR", EUR);
        this.prices.put("USD", USD);

    }

    /** Called when a price ‘p’ is received for an entity ‘e’ */
    public void putPrice(String e, BigDecimal p) {
        prices.compute(e, (k, price)-> {
            if(price==null) price=new Price();
            price.setHasChangedSinceLastRead(true);
            price.setPrice(p);
            return price;
        });
    }

    /** Called to get the latest price for entity ‘e’ */
    public BigDecimal getPrice(String e) {
        Price price = prices.computeIfPresent(e, (key, value) -> {
            value.setHasChangedSinceLastRead(false);
            return value;
        });
        return price==null? null: price.getPrice();
    }

    /**
     * Called to determine if the price for entity ‘e’ has
     * changed since the last call to getPrice(e).
     */
    public boolean hasPriceChanged(String e) {
        final Price price = prices.get(e);
        return price!=null && price.isHasChangedSinceLastRead();
    }
}

The compute… methods on a concurrent map lock the affected entry for the duration of the computation while letting updates of all other entries proceed. For simple get access like in hasPriceChanged, no additional synchronization is necessary as long as you call it only once in a method, i.e. keep the result in a local variable while examining.


Before Java 8, things are more complicated. There, all ConcurrentMap offers, are certain atomic update methods which can be used to build more high-level update methods in a try-and-repeat fashion.

To use it cleanly, the best way is to make the value class immutable:

public final class Price {

    private final BigDecimal price;
    private final boolean hasChangedSinceLastRead;

    Price(BigDecimal value, boolean changed) {
      price=value;
      hasChangedSinceLastRead=changed;
    }
    public boolean isHasChangedSinceLastRead() {
        return hasChangedSinceLastRead;
    }
    public BigDecimal getPrice() {
        return price;
    }
}

Then use it to always construct a new object reflecting the desired new state and perform atomic updates using either putIfAbsent or replace:

public final class PriceHolder {

    private ConcurrentMap<String, Price> prices;

    public PriceHolder() {
        this.prices = new ConcurrentHashMap<>();

        //Receive starting prices..
        Price EUR = new Price(BigDecimal.ZERO, true);
        Price USD = EUR; // we can re-use immutable objects...
        this.prices.put("EUR", EUR);
        this.prices.put("USD", USD);
    }

    /** Called when a price ‘p’ is received for an entity ‘e’ */
    public void putPrice(String e, BigDecimal p) {
      Price old, _new=new Price(p, true);
      do old=prices.get(e);
      while(old==null? prices.putIfAbsent(e,_new)!=null: !prices.replace(e,old,_new));
    }

    /** Called to get the latest price for entity ‘e’ */
    public BigDecimal getPrice(String e) {
        for(;;) {
          Price price = prices.get(e);
          if(price==null) return null;
          if(!price.isHasChangedSinceLastRead()
          || prices.replace(e, price, new Price(price.getPrice(), false)))
            return price.getPrice();
        }
    }

    /**
     * Called to determine if the price for entity ‘e’ has
     * changed since the last call to getPrice(e).
     */
    public boolean hasPriceChanged(String e) {
        final Price price = prices.get(e);
        return price!=null && price.isHasChangedSinceLastRead();
    }
}
Holger
  • 285,553
  • 42
  • 434
  • 765
  • Be aware that compute incurs the performance of a write (60M vs 1B ops/s). You'd probably want to use a double-checked locking style scheme to optimistically have read performance in the common case. – Ben Manes Nov 03 '15 at 18:50
  • 1
    @Ben Manes: compute *is* a write here. Even in the case it returns the existing object, it will modify two properties. So without the write semantic, an additional lock would be needed. The optimistic read would be useful if you assume that it happens more than occasionally that a particular operation does not need to perform an update, but this isn’t given by the question. – Holger Nov 03 '15 at 18:56
  • Sorry, I meant compute in the general sense. Specifically I was referring to `getPrice`'s use of `computeIfPresent`. Since crux of the problem was a performance issue, it seemed worth indicating. – Ben Manes Nov 03 '15 at 19:04
  • 1
    @Ben Manes: but even `getPrice` incorporates a write. But you’re right, depending on the actual use case, there are alternatives. However, the question is about avoiding locking of the entire map, and this is a very simple way of solving this problem, which I would make more complicated only in the case, the resulting performance is really a problem… – Holger Nov 03 '15 at 19:14
1

How about something like

class AtomicPriceHolder {

  private volatile BigDecimal value;
  private volatile boolean dirtyFlag;

  public AtomicPriceHolder( BigDecimal initialValue) {
    this.value = initialValue;
    this.dirtyFlag = true;
  }

  public synchronized void updatePrice( BigDecimal newPrice ) {
    if ( this.value.equals( newPrice ) == false) {
      this.value = newPrice;
      this.dirtyFlag = true;
    }
  }

  public boolean isDirty() {
    return this.dirtyFlag;
  }

  public BigDecimal peek() {
    return this.value;
  }

  public synchronized BigDecimal read() {
    this.dirtyFlag = false;
    return this.value;
  }

}

...

public void updatePrice( String id, BigDecimal value ) {

  AtomicPriceHolder holder;
  synchronized( someGlobalSyncObject ) {
    holder = prices.get(id);
    if ( holder == null ) {
      prices.put( id, new AtomicPriceHolder( value ) );
      return;
    }
  }

  holder.updatePrice( value );

}

Note though that it probably does not make any sense this way, because the actual atomic modification of the price's value is so fast that you cannot expect to gain anything from unlocking the map before.

The conditional operations "check if it's in the map, create a new one and insert if not" must be atomic, and should be done by locking the whole map for that brief period. Anything else would require a dedicated synchronization object for each key. These would have to be stored and managed somewhere, and access to that store would have to be synchronized again &c.

Just do the coarse-grained locking to ensure you have correctness and then move on.

JimmyB
  • 12,101
  • 2
  • 28
  • 44
1
  1. how do I provide free access to rest of the keys if one is in the update process.

Simply using a ConcurrentHashMap is sufficient to ensure free access to the keys; get's do not introduce any contention, and puts only lock a subset of keys rather than the whole map.

  1. How do I guarantee atomic operations of my methods since they require multiple operations read/write.

To ensure consistency you need to synchronize on some shared object (or use another locking mechanism, like ReentrantLock); I would suggest creating a ConcurrentHashMap<String, Object> of lock objects, so that you can do:

synchronized (locks.get(e)) { ... }

Just populate the map with new Object()'s. The risk with the pattern you use (of locking on the Price objects) is now these objects must persist and never be replaced. It's easier to enforce that by having a dedicated private collection of locks rather than overloading your value type as a lock mechanism.


As an aside, if you're trying to do money operations in Java, you should absolutely be using the Joda-Money library, rather than reinventing the wheel.

dimo414
  • 47,227
  • 18
  • 148
  • 244
1

hasChangedMethod returns a boolean identifying if price has been changed since last time getPrice has been invoked.

This is a problematic pattern, since hasPriceChanged essentially needs to return something different per-thread. If you can elaborate a little more on what you're actually trying to do (i.e. why you think you need this pattern) it may be possible to offer an alternative. For example, consider doing away with hasPriceChanged entirely, and simply treating this data structure as canonical and querying its current value every time.


That said, here's how I might implement the behavior you're looking for. There may well be alternatives, this is just a first pass.

Keep a ConcurrentHashMap<String, ThreadLocal<Boolean>>; the ThreadLocal will store the status of get calls per-thread. I also use a separate, private map of locks.

ConcurrentHashMap<String, Price> pricesMap;
ConcurrentHashMap<String, ThreadLocal<Boolean>> seenMap;
ConcurrentHashMap<String, Object> lockMap;

private Object getLock(String key) {
  return lockMap.computeIfAbsent(key, k -> new Object());
}

private ThreadLocal<Boolean> getSeen(String key) {
  return seenMap.computeIfAbsent(e,
      ThreadLocal.<Boolean>withInitial(() -> false));
}

public void putPrice(String e, BigDecimal p) {
  synchronized (getLock(e)) {
    // price has changed, clear the old state to mark all threads unseen
    seenMap.remove(e);
    pricesMap.get(e).setPrice(p);
  }
}

public BigDecimal getPrice(String e) {
  synchronized (getLock(e)) {
    // marks the price seen for this thread
    getSeen(e).set(true);
    BigDecimal price = pricesMap.get(e);
    return price != null ? price.getPrice() : null;
  }
}

public boolean hasPriceChanged(String e) {
  synchronized (getLock(e)) {
    return !getSeen(e).get();
  }
}

Notice that while the data structure is thread-safe there is still a risk of a race condition here - you could call hasPriceChanged() and get back false, immediately after which the price is changed by another thread. Doing away with this hasPriceChanged() behavior will likely simplify your code.

dimo414
  • 47,227
  • 18
  • 148
  • 244
  • Great, let me try! Btw, there are some problems with type returned by lambda at `getSeen()` method. – Wild Goat Nov 03 '15 at 18:40
  • Sorry about that, I just coded this up inline, so there may be some small typos. You can always write it Java 7-style if need be. Feel free to edit this post with your changes. – dimo414 Nov 03 '15 at 18:44
0

I would recommend using the observable - observer pattern. No need to re-invent the wheel. See Observer and Observable

I would also recommend looking into Condition Since no need to lock up the complete object for all readers. Reading can be concurrent but writing can't.

If a collection is concurrent it doesn't mean that it magically synchronizes everything. It just guarantees that their methods are thread-safe. Once you leave the function scope the lock is released. Because you need a more advanced way of controlling synchronization it is best you take this into your own hands and use the normal HashMap.

Some notes:

You are overusing the HashMap.get. Consider getting it once and storing it in a variable.

synchronized (prices.get(e))

This may return null and you should be checking for it. synchronized on null objects is not allowed.

prices.put(e, currentPrice);

I'm not sure if this is intended, but this action is not needed. See this

Neijwiert
  • 985
  • 6
  • 19
  • "This may return null and you should be checking for it." -- Could turn out to be harder than it looks. You can't atomically check if an object is null *and* synchronize on it. – JimmyB Nov 04 '15 at 11:31
0

The best way to get what you want is probably to go ahead and use a ConcurrentMap for the map, and to put all other synchronization in your Price class. This will lead to simpler code, which is always highly valuable in a multithreaded environment to avoid subtle bugs, while also achieving your goals of simultaneous access to the map and tracking of whether there has been a write since the last read for each currency.

In the Price class, whenever you set a price, you also want to set hasChangedSinceLastRead; these two things go together as one operation that should be atomic. Whenever you read the price, you also want to clear hasChangedSinceLastRead; this you also want to be atomic. Thus, the class should only permit these two operations to modify hasChangedSinceLastRead, rather than leaving the logic to other classes, and the methods should be synchronized to ensure that price and hasChangedSinceLastRead cannot get out of sync due to access by multiple threads. The class should now look like this:

public class Price {

    public boolean isHasChangedSinceLastRead() {
        return hasChangedSinceLastRead;
    }

    // setHasChangedSinceLastRead() removed

    public synchronized BigDecimal getPrice() {
        hasChangedSinceLastRead = false;
        return price;
    }

    public synchronized void setPrice(BigDecimal newPrice) {
        if (null != price && price.equals(newPrice) {
            return;
        }
        price = newPrice;
        hasChangedSinceLastRead = true;
    }

    private BigDecimal price;
    private volatile boolean hasChangedSinceLastRead = false;
}

Note that you can either make isHasChangedSinceLastRead() synchronized, or hasChangedSinceLastRead volatile; I chose the latter, leaving isHasChangedSinceLastRead() unsynchronized, because synchronizing the method requires a full memory barrier, while making the variable volatile requires only the read half of a memory barrier when the variable is read.

The reason a read of hasChangedSinceLastRead requires some sort of memory barrier is because synchronized methods, blocks, and volatile access only guarantee effective execution order - the "happens before" relationship - with other synchronized methods, blocks, and volatile access. If isHasChangedSinceLastRead is not synchronized and the variable is not volatile, no "happens before" relationship exists; in this case isHasChangedSinceLastRead could return "true" but the thread calling it might not see the change in price. This is because the setting of price and hasChangedSinceLastRead in setPrice() might be seen in the reverse order by other threads if the "happens before" relationship has not been established.

Now that all of the necessary synchronization is in the ConcurrentMap and Price classes, you no longer need to do any synchronization at all in PriceHolder, and PriceHolder no longer has to worry about updating hasChangedSinceLastRead. The code is simplified to:

public final class PriceHolder {

    private ConcurrentMap<String, Price> prices;

    public PriceHolder() {
        prices = new ConcurrentHashMap<>();

        //Receive starting prices..
        Price EUR = new Price();
        EUR.setPrice(new BigDecimal(0));
        this.prices.put("EUR", EUR);

        Price USD = new Price();
        USD.setPrice(new BigDecimal(0));
        this.prices.put("USD", USD);
    }

    /** Called when a price ‘p’ is received for an entity ‘e’ */
    public void putPrice(
        String e,
        BigDecimal p
    ) throws InterruptedException {
        Price currentPrice = prices.get(e);
        if (null == currentPrice) {
            currentPrice = new Price();
            currentPrice = prices.putIfAbsent(e);
        }
        currentPrice.setPrice(p);
    }

    /** Called to get the latest price for entity ‘e’ */
    public BigDecimal getPrice(String e) {
        Price currentPrice = prices.get(e);
        if (currentPrice != null){
            return currentPrice.getPrice();
        }
        return null;
    }

    /**
     * Called to determine if the price for entity ‘e’ has
     * changed since the last call to getPrice(e).
     */
    public boolean hasPriceChanged(String e) {
        Price currentPrice = prices.get(e);
        return null != currentPrice ? currentPrice.isHasChangedSinceLastRead() : false;
    }
}
Warren Dew
  • 8,790
  • 3
  • 30
  • 44
  • Wow! Very simple and clean. Could you please explain again why do we need `volatile` on `hasChangedSinceLastRead` - I thought that if we put `synchronized` on the method it will lock the whole object. – Wild Goat Nov 04 '15 at 14:35
  • Added a paragraph explaining why either isHasChangedSinceLastRead() has to be synchronized or hasChangedSinceLastRead has to be volatile. Either one suffices, but we need one of the two. I picked the latter because it has less execution overhead. More detail in the edited answer. – Warren Dew Nov 04 '15 at 18:18
0

I think you can put lock on you specific key only.

import java.util.HashMap;

public class CustomHashmap<K, V> extends HashMap<K, V>{

    private static final long serialVersionUID = 1L;

    @Override
    public V put(K key, V value) {
        V val = null;
        synchronized (key) {
            val = super.put(key, value);
        }

        return val;
    }

    @Override
    public V get(Object key) {
        return super.get(key);
    }


}