3

I have a below class in which I have a add method which is called by another thread to populate my clientidToTimestampHolder multimap. And then in the same below class, I start a background thread which runs every 60 seconds and calls a processData() method which iterates the same map and send all those data to some other service.

public class Handler {
  private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
  private final Multimap<String, Long> clientidToTimestampHolder = ArrayListMultimap.create();

  private static class Holder {
    private static final Handler INSTANCE = new Handler();
  }

  public static Handler getInstance() {
    return Holder.INSTANCE;
  }

  private Handler() {
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        processData();
      }
    }, 0, 60, TimeUnit.SECONDS);
  }

  // called by another thread to populate clientidToTimestampHolder map
  public void add(final String clientid, final Long timestamp) {
    clientidToTimestampHolder.put(clientid, timestamp);
  }

  // called by background thread
  public void processData() {
    for (Entry<String, Collection<Long>> entry : clientidToTimestampHolder.asMap().entrySet()) {
      String clientid = entry.getKey();
      Collection<Long> timestamps = entry.getValue();
      for (long timestamp : timestamps) {
        boolean isUpdated = isUpdatedClient(clientid, timestamp);
        if (!isUpdated) {
          updateClient(String.valueOf(clientid));
        }
      }
    }
  }
}

My question is, add method will keep getting called every time from a different thread. So do I need to create a copy of clientidToTimestampHolder map and pass that copy to processData() method as a parameter instead of directly working on that map?

Because right now I am using the same map to populate data in it and then also iterate the same map to send stuff to some other service so I am not deleting data from that map so those entries will always be there in that map.

What is the best way to solve this problem? And I need to make sure it is thread safe and there is no race condition as I cannot loose any clientid.

Update

So my processData method will look like this?

  public void processData() {
    synchronized (clientidToTimestampHolder) {
      Iterator<Map.Entry<String, Long>> i = clientidToTimestampHolder.entries().iterator();
      while (i.hasNext()) {
        String clientid = i.next().getKey();
        long timestamp = i.next().getValue();
        boolean isUpdated = isUpdatedClient(clientid, timestamp);
        if (!isUpdated) {
          updateClient(clientid);
        }
        i.remove();
      }
    }
  }
user1234
  • 145
  • 1
  • 12
  • You updated `processData` is wrong as it calls `Iterator#next()` two times per `hasNext()` check. – Grzegorz Rożniecki Mar 15 '17 at 22:12
  • Yeah got it. So I can `clear` the map inside the synchronized block while iterating with for loop as you suggested instead of using remove on the iterator. – user1234 Mar 15 '17 at 23:02
  • Either `clear` in synchronized block like in my post or extract entry in your updated example: `while (i.hasNext()) { Entry e = i.next(); String clientid = e.getKey(); long timestamp = e.getValue(); // etc.`. Or use `BlockingQueue` as I suggested in my post after edit. – Grzegorz Rożniecki Mar 16 '17 at 07:33

2 Answers2

2

Use Multimaps.synchronized(List)Multimap wrapper to have thread safe reference to a multimap (ArrayListMultimap is a ListMultimap, i.e. stores values in lists):

private final ListMultimap<String, Long> clientidToTimestampHolder = 
    Multimaps.synchronizedListMultimap(ArrayListMultimap.create());

Note that synchronized multimap wrappers have following warning:

It is imperative that the user manually synchronize on the returned multimap when accessing any of its collection views:

// ...  

Failure to follow this advice may result in non-deterministic behavior.

In your case, you'd have to manually synchronize iteration of entries view, since its iterator isn't synchronized:

public void processData() {
  synchronized (clientidToTimestampHolder) {
    for (Map.Entry<String, Long> entry : clientidToTimestampHolder.entries()) {
      String clientid = entry.getKey();
      long timestamp = entry.getValue();
      boolean isUpdated = isUpdatedClient(clientid, timestamp);
      if (!isUpdated) {
        updateClient(String.valueOf(clientid));
      }
    }
    clientidToTimestampHolder.clear();
  }
}

(I used Mutlimap.entries() instead of Multimap.asMap().entrySet() because it's cleaner this way).

Also if you wonder why there isn't general-purpose ConcurrentXxxMultimap implementation, see Guava's issue #135 and this comment quoting internal discussion about this:

I tried to build a general-purpose concurrent multimap, and it turned out to be slightly faster in a small fraction of uses and Much slower in most uses (compared to a synchronized multimap). I was focused on making as many operations as possible atomic; a weaker contract would eliminate some of this slowness, but would also detract from its usefulness.

I believe the Multimap interface is too "large" to support an efficient concurrent implementation - sorted or otherwise. (Clearly, this is an overstatement, but at the very least it requires either a lot of work or a loosening of the Multimap interface.)

EDIT:

Reading your comments, it seems like XY Problem to me. Having that said, IMO you shouldn't use Multimap here as you don't use any of its features, but rather take BlockingQueue which has a handy drainTo(Collection) method (and is thread-safe):

private final LinkedBlockingQueue<Map.Entry<String, Long>> clientidToTimestampHolder =
    new LinkedBlockingQueue<>();

public void add(final String clientid, final Long timestamp) {
  clientidToTimestampHolder.offer(Maps.immutableEntry(clientid, timestamp));
}

public void processData() {
  final List<Map.Entry<String, Long>> entries = new ArrayList<>();
  clientidToTimestampHolder.drainTo(entries);
  for (Map.Entry<String, Long> entry : entries) {
    String clientid = entry.getKey();
    long timestamp = entry.getValue();
    boolean isUpdated = isUpdatedClient(clientid, timestamp);
    if (!isUpdated) {
      updateClient(String.valueOf(clientid));
    }
  }
}

You could (should?) probably create your own value class for your data to store String and long fields and use it instead of general-purpose Map.Entry<String, Long>.

Grzegorz Rożniecki
  • 27,415
  • 11
  • 90
  • 112
  • Also how do I delete the entries from map once I am done processing? So do I need to create a copy of `clientidToTimestampHolder` map and then pass it to `processData` map? Basically I want to process whatever is there in `clientidToTimestampHolder` map every 60 seconds – user1234 Mar 15 '17 at 13:02
  • And `processData` method is only called by single background thread every 60 seconds then why we need to synchronized that? – user1234 Mar 15 '17 at 13:10
  • @user1234 Because another thread could modify multimap during the iteration leading (potentially) to `ConcurrentModificationException` and you want to have deterministic behavior here (the docs mentions that _failure to follow this advice may result in non-deterministic behavior_). – Grzegorz Rożniecki Mar 15 '17 at 15:10
  • Ok got it now.. And what about the removal of entries that have been processed otherwise after every 60 seconds it will start processing old entries again? right? Basically I want to process everything that is there in the map every 60 seconds – user1234 Mar 15 '17 at 15:21
  • You can do `clientidToTimestampHolder.clear()` inside synchronized block, after iterating. But if that's your use case, then you don't need multimap at all, use `BlockingQueue`. I'll update my answer to show what I mean. – Grzegorz Rożniecki Mar 15 '17 at 22:06
  • I was using multimap because I can have same clientid multiple times with different timestamp and that is why I was using it. But with blocking queue implementation, it will overwrite that key with the new timestamp value right? – user1234 Mar 15 '17 at 22:18
  • It will not overwrite any values, `LinkedBlockingQueue` doesn't care about duplicates, so if entries ("1", 12345678), ("1", 12345699), ("1", 12345699), ("2", 12345699) are put within a minutes, they will be drained in FIFO order. – Grzegorz Rożniecki Mar 16 '17 at 07:31
0

Right now, with your code you are going mainly to observe your map being inconsistent, because at one iteration, you could have [1: "value1",2: "value2",3: "value3"] in your Map, and the next iteration your Map could be [1: "value1",2: "value2",3: "value3", 4: "value4"] . The main issue is that I believe MultiMap is not ensuring you the order in which elements are enqueued (see this post), and so you could skip an element during the iteration (it's for you to decide if it's dangerous or not)

If you really need to stop every put operation you can indeed use @Xaerxess method of synchronizing the map inside processData(). Another possibility you mention is making some defensive copying, basically iterating over a snapshot of your MultiMap, first you would do:

public Multimap<String, Long> getClientidToTimestampHolder(){
    return ImmutableSetMultimap.copyOf(clientidToTimestampHolder);
}

And the iteration would be done on this snapshot:

 public void processData() {
    Multimap<String, Long> tmpClientToTimestampHolder = getClientidToTimestampHolder();
    for (Entry<String, Collection<Long>> entry : tmpClientToTimestampHolder.asMap().entrySet()) {
      String clientid = entry.getKey();
      Collection<Long> timestamps = entry.getValue();
      for (long timestamp : timestamps) {
        boolean isUpdated = isUpdatedClient(clientid, timestamp);
        if (!isUpdated) {
          updateClient(String.valueOf(clientid));
        }
      }
    }
  }

Seeing your comment on deletion, you would want to do a synchronized block to do it atomically:

synchronized (clientidToTimestampHolder){
            clientidToTimestampHolder.remove(key, value);//fill key,value, or use removAll(key)
}

Why do you need synchronization? Because if you want to have the exact map at time t, then you need to prevent other threads to add elements to it. This is done through locking in Java, thus as long as a thread (here your background thread) obtains a lock on the map, no other threads will be able to access the multimap while you read from it.

Community
  • 1
  • 1
Adonis
  • 4,670
  • 3
  • 37
  • 57
  • I cannot skip any element so I need to make sure I am iterating all the elements and once I am done processing, it should be removed from the map instead of keeping it there forever. So what do you think is the best way for this? – user1234 Mar 15 '17 at 13:27
  • And for deleting where I would add that code? Will it be in processData method? – user1234 Mar 15 '17 at 13:30
  • In that case I would use @Xaerxess's answer and synchronize on the collection for the iteration, and inside the same synchronized block, put the remove method (so yes in processData). Because making a defensive copy is fine as long as you don't need a snapshot is enough, if you need to stop adding elements during the iteration, then it's not enough and you need to lock all access to the map. – Adonis Mar 15 '17 at 13:30
  • Added an update in my question. That's what you meant? – user1234 Mar 15 '17 at 13:36
  • That's exactly what I meant! – Adonis Mar 15 '17 at 13:52
  • @user1234: Strike my comment above, actually you might want to do it as mentioned in this article, as if you do it like in your edit, you might get a ConcurrentModificationException: http://stackoverflow.com/questions/1572178/guava-multimap-and-concurrentmodificationexception – Adonis Mar 15 '17 at 14:05
  • so that means I need to iterate twice one for sending and other just for removing? Can I not do both the things in one loop? – user1234 Mar 15 '17 at 14:19
  • @user1234 : As far as I know, you can do it in one loop, you just need to modify it to use the Iterator syntax as in the response to the post in my previous comment – Adonis Mar 15 '17 at 14:28
  • I think I got it. I updated my question. Is that what you meant now? – user1234 Mar 15 '17 at 14:41