0

I am calculating latencies in my application (in millseconds) and I want insert those metric in a thread safe list structure. And then I will use that list to calculate average, median, 95th percentile later on. So I looked it up and I didn't see much option with list so I decided to use ConcurrentLinkedQueue to store the latencies which is thread safe. If there is any other better thread safe data structure that I should use for this, let me know.

public class LatencyMetricHolder {
  private final ConcurrentLinkedQueue<Long> latenciesHolder = new ConcurrentLinkedQueue<>();

  private static class Holder {
    private static final LatencyMetricHolder INSTANCE = new LatencyMetricHolder();
  }

  public static LatencyMetricHolder getInstance() {
    return Holder.INSTANCE;
  }

  private LatencyMetricHolder() {}    

  public void addLatencies(long latency) {
    latenciesHolder.add(latency);
  }

  public ConcurrentLinkedQueue<Long> getLatenciesHolder() {
    return latenciesHolder;
  }
}

I am calling addLatencies method to populate the latencies from multithreaded code.

Now I want to have latenciesHolder for each processId which is a String. That means we can also get same processId multiple times and sometimes it will be a new processId, so somehow then I need to extract the latenciesHolder queue for that processId and add latency on that particular queue in a thread safe way and atomic way.

So I decided to use concurrent map for this as shown below where key will be processId:

private final Map<String, ConcurrentLinkedQueue<Long>> latenciesHolderByProcessId = Maps.newConcurrentMap();

Since I am using a map, then I need to synchronize on the creation of new ConcurrentLinkedQueue instances which is kind of tricky in Java 7 I guess as I am on Java 7.

What is the right way to populate this map in a atomic way from multiple threads without much contention and I want to use concurrent collections instead of conventional locking if there is any way?

Update:

  public void add(String type, long latencyInMs) {
    ConcurrentLinkedQueue<Long> latencyHolder = latenciesHolderByProcessId.get(type);
    if (latencyHolder == null) {
      latencyHolder = Queues.newConcurrentLinkedQueue();
      ConcurrentLinkedQueue<Long> currentLatencyHolder =
          latenciesHolderByProcessId.putIfAbsent(type, latencyHolder);
      if (currentLatencyHolder != null) {
        latencyHolder = currentLatencyHolder;
      }
    }
    latencyHolder.add(latencyInMs);
  }
Community
  • 1
  • 1
john
  • 11,311
  • 40
  • 131
  • 251
  • you're suggesting the same process ID can be assigned multiple times. This is true, but any decent OS will recycle ID with a very large period. I assume you will clear the statistics when you print them, so it's likely you will run out of memory because you keep storing stats much sooner than a process ID is recycled. Otherwise, how do you plan to find out whether an existing entry for a PID is for this process, or was generated for a previous instance with the same PID? – Roberto Attias Dec 13 '16 at 01:43
  • don't related processId with unix PID. It is just our own process id for different term. – john Dec 13 '16 at 01:44
  • and how do you distinguish the case of same instance vs. two instances with same ID? – Roberto Attias Dec 13 '16 at 01:45
  • The idea is for each processId, there will be a concurrentqueue which holds bunch of latencies.. So I will create a graph for this processId, this was the average, median 95th percentile from that queue.. And for another processId, this was the average, median 95th percentile queue.. I will send these information to some other system which will draw graph for us. – john Dec 13 '16 at 02:02
  • Per your actual question: typically a `get(key)` followed by a `putIfAbsent` if missing is used. This way a cache hit is fast and a miss doesn't overwrite if it loses. If the computation is more expensive then a future or customized map is used (e.g. Guava's cache). – Ben Manes Dec 13 '16 at 02:59
  • You mean using `LoadingCache` here which will automatically synchronize on the creation of new `ConcurrentLinkedQueue` instances? Also can you provide an example with `putIfAbsent` so that I can understand better. Right now m confuse how to use `putIfAbsent` here? – john Dec 13 '16 at 03:12
  • @david Its a double-checked locking approach. If `get(k)` returns a value then done. Else create a new value and try to insert it (putIfAbsent). But if `putIfAbsent` returns a non-null, then discard your new value and use that one. This way you (1) check, (2) try insert, (3) return the mapping. – Ben Manes Dec 13 '16 at 06:08
  • @BenManes I have updated the question with my code with the approach you mentioned. Let me know if that's what you meant? And will it be thread safe and all the operation will be an atomic? – john Dec 14 '16 at 20:06
  • @david yes, that should work. Your other option is to use a single queue where each entry has the process id. Then drain it periodically with a single thread into a map, etc. That might better fit your use-case. – Ben Manes Dec 14 '16 at 21:35

1 Answers1

0

ok, I'm not sure I get all the specs right, but this is one way to do it:

public class LatencyMetricHolder {

 HashMap<Integer, ConcurrentLinkedQueue<Long>> map = new HashMap<>();

 public void addLatency(int processID, long latency) {
    synchronized(map) {
      ConcurrentLinkedQueue<Long> q = map.get(processID);
      if (q == null) {
        q = new ConcurrentLinkedQueue<Long>()
        map.put(processID, q);
      }
    }
    q.add(latency)
  }

  public ConcurrentLinkedQueue<Long> getLatenciesHolder(int processID) {
    synchronized(map) {
      return map.get(processId);
    }
  }
}
Roberto Attias
  • 1,883
  • 1
  • 11
  • 21