1

I am trying to implement a function that do a round robin on elements of a ConcurrentHashMap. For example, if there are {a, b, c} elements in the ConcurrentHashMap, the first time I call the function, it returns a, the second time, it returns b, the third time, it returns c, the forth time, it returns a.

private static Enumeration<Peer> nhEnmu;
private static final ConcurrentHashMap<String, Peer> peers;

private synchronized static Peer getNextPeer()
{
    if (nhEnmu == null || !nhEnmu.hasMoreElements())
    {
        nhEnmu = peers.elements();
    }

    return nhEnmu.nextElement();
}

I implemented this function as above, however, the NoSuchElementException keeps poping out, I am wondering that is there anything wrong for using the elements() method? If it is not appropriate, what implementation should I adapt? Thank you!

The exception trace is as follows:

at Main$MsgProcessorThread.run(Main.java:119)
Exception in thread "Thread-1" java.util.NoSuchElementException at
java.util.concurrent.ConcurrentHashMap$HashIterator.nextEntry(ConcurrentHashMap.java:1266) at
java.util.concurrent.ConcurrentHashMap$ValueIterator.nextElement(ConcurrentHashMap.java:1297) at 
control.Protocol.getNextPeer(Protocol.java:89)
William Morrison
  • 10,953
  • 2
  • 31
  • 48
John Stone
  • 53
  • 9

2 Answers2

1

According to this question iterators returned are not thread safe. If so, and you are accessing the iterator on multiple thread, this exception could occur.

You probably have a method to add and remove peers in this class too. Try keeping a list of peers and an index into that list. When you remove a peer, remove that peer from both the hashmap and your peer round robin list, and update your index appropriately. Do the same thing when adding.

Then when getNextPeer is called, return the next peer in your list and increment your index. Wrap back to zero if index exceeds the size of the list.

Something like....

private static List<Peer> nhEnmu;
private static int index;
private static final ConcurrentHashMap<String, Peer> peers;

private synchronized static Peer getNextPeer()
{
    Peer peer = null;
    if (nhEnmu.size()>0)
    {
        peer = nhEnmu.get(index);
        index++;
        if(index>=nhEnmu.size())
            index = 0;
    }
    return peer;
}
Community
  • 1
  • 1
William Morrison
  • 10,953
  • 2
  • 31
  • 48
  • I tried to use synchronized keyword on both the function and the nhEnmu iterator, however, this exception still exists. I am wondering that what should I do to make this function thread safe? – John Stone Jul 21 '13 at 07:28
  • I'd give up on the enumeration and take the approach I suggested in my edit. Or, look up the source code for `ConcurrentHashMap` and see exactly what the enumeration is doing. I just looked at the source, its complex. – William Morrison Jul 21 '13 at 07:31
0

Use ConcurrentHashMap.entrySet() directly instead. it is safe.

Milad
  • 89
  • 1
  • 8
  • The question linked in my answer contradicts this. – William Morrison Jul 21 '13 at 07:11
  • I dont think so, it says: "In conclusion, a statement like for (Object o : someConcurrentHashMap.entrySet()) { // ... } will be fine (or at least safe) almost every time you see it." if you always link to entrySet() it is safe, but it may change during the iteration and can throw an exception. – Milad Jul 21 '13 at 11:06
  • Almost being the key word here. If multiple threads are operating on the structure it is not a safe. – William Morrison Jul 21 '13 at 17:47
  • My mistake! entrySet() never throw exception, but while you iterate through it other threads wait for you to finish, then they can write – Milad Jul 22 '13 at 05:07