10

I have a class in which I am populating a map liveSocketsByDatacenter from a single background thread every 30 seconds inside updateLiveSockets() method and then I have a method getNextSocket() which will be called by multiple reader threads to get a live socket available which uses the same map to get this information.

public class SocketManager {
  private static final Random random = new Random();
  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
  private final AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter =
      new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>()));
  private final ZContext ctx = new ZContext();

  // Lazy Loaded Singleton Pattern
  private static class Holder {
    private static final SocketManager instance = new SocketManager();
  }

  public static SocketManager getInstance() {
    return Holder.instance;
  }

  private SocketManager() {
    connectToZMQSockets();
    scheduler.scheduleAtFixedRate(new Runnable() {
      public void run() {
        updateLiveSockets();
      }
    }, 30, 30, TimeUnit.SECONDS);
  }

  // during startup, making a connection and populate once
  private void connectToZMQSockets() {
    Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
    // The map in which I put all the live sockets
    Map<Datacenters, List<SocketHolder>> updatedLiveSocketsByDatacenter = new HashMap<>();
    for (Map.Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH);
      updatedLiveSocketsByDatacenter.put(entry.getKey(),
          Collections.unmodifiableList(addedColoSockets));
    }
    // Update the map content
    this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(updatedLiveSocketsByDatacenter));
  }

  private List<SocketHolder> connect(Datacenters colo, List<String> addresses, int socketType) {
    List<SocketHolder> socketList = new ArrayList<>();
    for (String address : addresses) {
      try {
        Socket client = ctx.createSocket(socketType);
        // Set random identity to make tracing easier
        String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
        client.setIdentity(identity.getBytes(ZMQ.CHARSET));
        client.setTCPKeepAlive(1);
        client.setSendTimeOut(7);
        client.setLinger(0);
        client.connect(address);

        SocketHolder zmq = new SocketHolder(client, ctx, address, true);
        socketList.add(zmq);
      } catch (Exception ex) {
        // log error
      }
    }
    return socketList;
  }

  // this method will be called by multiple threads to get the next live socket
  // is there any concurrency or thread safety issue or race condition here?
  public Optional<SocketHolder> getNextSocket() {
    // For the sake of consistency make sure to use the same map instance
    // in the whole implementation of my method by getting my entries
    // from the local variable instead of the member variable
    Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
        this.liveSocketsByDatacenter.get();
    Optional<SocketHolder> liveSocket = Optional.absent();
    List<Datacenters> dcs = Datacenters.getOrderedDatacenters();
    for (Datacenters dc : dcs) {
      liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
      if (liveSocket.isPresent()) {
        break;
      }
    }
    return liveSocket;
  }

  // is there any concurrency or thread safety issue or race condition here?
  private Optional<SocketHolder> getLiveSocketX(final List<SocketHolder> endpoints) {
    if (!CollectionUtils.isEmpty(endpoints)) {
      // The list of live sockets
      List<SocketHolder> liveOnly = new ArrayList<>(endpoints.size());
      for (SocketHolder obj : endpoints) {
        if (obj.isLive()) {
          liveOnly.add(obj);
        }
      }
      if (!liveOnly.isEmpty()) {
        // The list is not empty so we shuffle it an return the first element
        Collections.shuffle(liveOnly);
        return Optional.of(liveOnly.get(0));
      }
    }
    return Optional.absent();
  }

  // Added the modifier synchronized to prevent concurrent modification
  // it is needed because to build the new map we first need to get the
  // old one so both must be done atomically to prevent concistency issues
  private synchronized void updateLiveSockets() {
    Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;

    // Initialize my new map with the current map content
    Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
        new HashMap<>(this.liveSocketsByDatacenter.get());

    for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
      List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
      for (SocketHolder liveSocket : liveSockets) { // LINE A
        Socket socket = liveSocket.getSocket();
        String endpoint = liveSocket.getEndpoint();
        Map<byte[], byte[]> holder = populateMap();
        Message message = new Message(holder, Partition.COMMAND);

        boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
        boolean isLive = (status) ? true : false;
        // is there any problem the way I am using `SocketHolder` class?
        SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
        liveUpdatedSockets.add(zmq);
      }
      liveSocketsByDatacenter.put(entry.getKey(),
          Collections.unmodifiableList(liveUpdatedSockets));
    }
    this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSocketsByDatacenter));
  }
}

As you can see in my class:

  • From a single background thread which runs every 30 seconds, I populate liveSocketsByDatacenter map with all the live sockets in updateLiveSockets() method.
  • And then from multiple threads, I call the getNextSocket() method to give me a live socket available which uses a liveSocketsByDatacenter map to get the required information.

I have my code working fine without any issues and wanted to see if there is any better or more efficient way to write this. I also wanted to get an opinion on thread safety issues or any race conditions if any are there, but so far I haven't seen any but I could be wrong.

I am mostly worried about updateLiveSockets() method and getLiveSocketX() method. I am iterating liveSockets which is a List of SocketHolder at LINE A and then making a new SocketHolder object and adding to another new list. Is this ok here?

Note: SocketHolder is an immutable class. And you can ignore ZeroMQ stuff I have.

john
  • 11,311
  • 40
  • 131
  • 251
  • 3
    It looks me like `liveSocketsByDatacenter` is **immutable**. That is a path to make this all much simpler. – Boris the Spider Oct 29 '17 at 08:11
  • 2
    Plus your logic is pretty knarly in lots of places. The abuse of `Optional` makes my eyes water. I would get rid of it entirely - you don't use it correctly anywhere. Hint: calling `Optional.isPresent` is always a bad idea. – Boris the Spider Oct 29 '17 at 08:13
  • I have few methods in which anything can go wrong. First is `updateLiveSockets` called by background thread every 30 seconds, second is `getNextSocket` method called by multiple reader threads concurrently which internally calls `getLiveSocket` method so all these three methods have to be right in terms of thread safety issues I believe. Do you think they all are doing right stuff? I am more afraid in `updateLiveSockets` method. – john Oct 29 '17 at 08:15
  • This question appears to be almost literally the same question and code as https://stackoverflow.com/questions/41952171. Are you the author of the previous question or did you just copy that question for some reason? – Paul Apr 14 '18 at 17:11

2 Answers2

8

You use the following synchronization techniques.

  1. The map with live socket data is behind an atomic reference, this allows safely switching the map.
  2. The updateLiveSockets() method is synchronized (implicitly on this), this will prevent switching the map by two threads simultaneously.
  3. You make a local reference to the map when using it to avoid mixups if the switch happens during the getNextSocket() method.

Is it thread safe, as it is now?

Thread safety always hinges on whether there is proper synchronization on shared mutable data. In this case the shared mutable data is the map of datacenters to their list of SocketHolders.

The fact that the map is in an AtomicReference, and making a local copy for use is enough synchronization on the map. Your methods take a version of the map and use that, switching versions is thread safe due to the nature of AtomicReference. This could also have been achieved with just making the member field for the map volatile, as all you do is update the reference (you don't do any check-then-act operations on it).

As scheduleAtFixedRate() guarantees that the passed Runnable will not be run concurrently with itself, the synchronized on updateLiveSockets() is not needed, however, it also doesn't do any real harm.

So yes, this class is thread safe, as it is.

However, it's not entirely clear if a SocketHolder can be used by multiple threads simultaneously. As it is, this class just tries to minimize concurrent use of SocketHolders by picking a random live one (no need to shuffle the entire array to pick one random index though). It does nothing to actually prevent concurrent use.

Can it be made more efficient?

I believe it can. When looking at the updateLiveSockets() method, it seems it builds the exact same map, except that the SocketHolders may have different values for the isLive flag. This leads me to conclude that, rather than switching the entire map, i just want to switch each of the lists in the map. And for changing entries in a map in a thread safe manner, I can just use ConcurrentHashMap.

If I use a ConcurrentHashMap, and don't switch the map, but rather, the values in the map, I can get rid of the AtomicReference.

To change the mapping I can just build the new list and put it straight into the map. This is more efficient, as I publish data sooner, and I create fewer objects, while my synchronization just builds on ready made components, which benefits readability.

Here's my build (omitted some parts that were less relevant, for brevity)

public class SocketManager {
    private static final Random random = new Random();
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>(); // use ConcurrentHashMap
    private final ZContext ctx = new ZContext();

    // ...

    private SocketManager() {
      connectToZMQSockets();
      scheduler.scheduleAtFixedRate(this::updateLiveSockets, 30, 30, TimeUnit.SECONDS);
    }

    // during startup, making a connection and populate once
    private void connectToZMQSockets() {
      Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
      for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
        List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
        liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets); // we can put it straight into the map
      }
    }

    // ...      

    // this method will be called by multiple threads to get the next live socket
    // is there any concurrency or thread safety issue or race condition here?
    public Optional<SocketHolder> getNextSocket() {
      for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
        Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); // no more need for a local copy, ConcurrentHashMap, makes sure I get the latest mapped List<SocketHolder>
        if (liveSocket.isPresent()) {
          return liveSocket;
        }
      }
      return Optional.absent();
    }

    // is there any concurrency or thread safety issue or race condition here?
    private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
      if (!CollectionUtils.isEmpty(listOfEndPoints)) {
        // The list of live sockets
        List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
        for (SocketHolder obj : listOfEndPoints) {
          if (obj.isLive()) {
            liveOnly.add(obj);
          }
        }
        if (!liveOnly.isEmpty()) {
          // The list is not empty so we shuffle it an return the first element
          return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
        }
      }
      return Optional.absent();
    }

    // no need to make this synchronized
    private void updateLiveSockets() {
      Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;

      for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
        List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
        List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
        for (SocketHolder liveSocket : liveSockets) { // LINE A
          Socket socket = liveSocket.getSocket();
          String endpoint = liveSocket.getEndpoint();
          Map<byte[], byte[]> holder = populateMap();
          Message message = new Message(holder, Partition.COMMAND);

          boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
          boolean isLive = (status) ? true : false;

          SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
          liveUpdatedSockets.add(zmq);
        }
        liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); // just put it straigth into the map, the mapping will be updated in a thread safe manner.
      }
    }

}
bowmore
  • 10,842
  • 1
  • 35
  • 43
  • Thanks for your detailed explanation. I am confuse on one part, will it guarantee that all the reader threads will see same state of a socket from `getNextSocket ()` method as soon as the state changes in `updateLiveSockets()` method? – john Nov 04 '17 at 03:30
  • Yes, once `updateLiveSockets()` has put the new list in the map, all subsequent `get()` from a reader threads will see the new list – bowmore Nov 04 '17 at 09:26
  • Ok got it, let me try this out and see how it works. I got another [question](https://stackoverflow.com/questions/47107331/how-to-make-sure-that-i-am-not-sharing-same-socket-between-two-threads-at-a-same) basis on this one only and I am trying to make sure that I dont share same socket between two threads. I have got the solution but trying to see will that work here or is there any better way as well? Maybe we can use ThreadLocal? But I am not sure yet. – john Nov 04 '17 at 17:31
  • Yes, I am still around – bowmore Nov 10 '17 at 23:18
  • do you have any thoughts on my other question? I believe we need a socket pool to get rid of synchronization and make it work efficiently? – john Nov 10 '17 at 23:23
  • I do, but they're along the same lines as the current answer. – bowmore Nov 10 '17 at 23:28
  • which current answer? I am still not sure how can I use any of those answer in my code? So if you can provide an example basis on my code then I will be able to get much better idea – john Nov 10 '17 at 23:49
  • Ah, there's more than one by now. I also wonder what the use is of checking liveness of the Socket, and if once a Socket is no longer live, whether it can recover by itself. Your code here seems to suggest so... – bowmore Nov 11 '17 at 13:10
  • We are checking liveness so that we dont send on the same socket if we know it is down.. It might be possible that box in which those sockets are there, it is down from a long time so if we check the liveness then it will tell us ok that these sockets are down already so don't send data on it. I do have an answer there but I cant seem to figure out how to use it in my solution properly. – john Nov 11 '17 at 17:31
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/158774/discussion-between-bowmore-and-david). – bowmore Nov 11 '17 at 18:05
4

If SocketHolder and Datacenters, are immutable, your programs looks fine. Here is some minor feedback, though.

1. Usage of AtomicReference

AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter

This member variable does not need to be wrapped in a AtomicReference. You are not doing any atomic CAS operation with it. You could simply declare a volative Map<Datacenters, List<SocketHolder>>, and when reading it, simply create a local reference to it. This is enough to guarantee an atomic swap of the reference to the new Map.

2. Synchronized method

private synchronized void updateLiveSockets()

This method is called from a single thread executor, so there is no need for it to be synchronized.

3. Some simplifications

  • From your current usage of this class, it seems like you could filter out sockets which are not alive in updateLiveSockets, avoiding to filter every time a client calls getNextSocket

  • You can replace Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS by Set<Datacenters> datacenters = Utils.SERVERS.keySet() and work with the keys.

    4. Java 8

If possible, switch to Java 8. Streams together with Java8's Optional would remove a lot of boilerplate code and make your code much easier to read.

Duarte Meneses
  • 2,868
  • 19
  • 22
  • I was going through your answer again and I wanted to know how can I achieve what you mention in point `3.1`. If I filter out sockets which are not alive then how will I iterate all the sockets again in my `updateLiveSockets` method? As you can see in my `updateLiveSockets` method, I am iterating all the sockets and pinging them to understand whether they are live or not? – john Dec 18 '17 at 05:27