3

I have a server program which accepts client connections. These client connections can belong to many streams. For example two or more clients can belong to the same stream. Out of these streams one message I have to pass but I have to wait until all the streams are established. For this I maintain the following data structure.

ConcurrentHashMap<Integer, AtomicLong> conhasmap = new ConcurrentHashMap<Integer, AtomicLong>();

Integer is the stream ID and Long is the client number. To make one thread for a given stream to wait till AtomicLong reach a specific value I used the following loop. Actually the first packet of the stream puts it stream ID and the number of connections to wait. With each connection I decrease the connections to wait.

while(conhasmap.get(conectionID) != new AtomicLong(0)){
       // Do nothing
}

However this loop blocks the other threads. According to this answer it does a volatile read. How can I modify the code to wait the correct thread for a given stream until it reaches a specific value?

Community
  • 1
  • 1
user340
  • 375
  • 12
  • 28
  • Why don't use thread.wait() and wake it after your server accepts a new connection? Another less elegant solution would be to sleep for a while and check the value. – PbxMan Jan 21 '15 at 14:15
  • 1
    Using a [`Condition`](http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/Condition.html) maybe? – fge Jan 21 '15 at 14:26
  • @PbxMan I am waiting a thread already accepted by the sever. Yes sleeping for a while and reading the value might work, however I look for a better way. – user340 Jan 21 '15 at 14:34
  • Just a thinking: Can't I wait on the AtomicLong value and notify when the thread waiting thread then the value reaches zero? – user340 Jan 21 '15 at 14:40
  • @fge will check the Condition – user340 Jan 21 '15 at 14:40
  • If understand it right, you can use CountDownLatch or CyclicBarrier, whichever suits you best. As Latches are not resetable once they reach the termination state. – fmucar Jan 21 '15 at 17:52
  • `while(conhasmap.get(conectionID) != new AtomicLong(0))` is nonsense. Whatever `get` returns, it can *never* be identical to an instance you just create with `new`; so it will loop forever. But it will *not* block other threads, slowing down your entire machine because you are doing a polling loop, yes, but *not blocking*. – Holger Jan 23 '15 at 17:50

2 Answers2

3

If you're using Java 8, CompletableFuture could be a good fit. Here's a complete, contrived example which is waiting for 5 clients to connect and send a message to a server (simulated using a BlockingQueue with offer/poll).

In this example, when the expected client connected message count is reached, a CompletableFuture hook is completed, which then runs arbitrary code on any thread of your choice.

In this example, you don't have any complex thread wait/notify setups or busy wait loops.

package so.thread.state;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class Main {

  public static String CONNETED_MSG = "CONNETED";
  public static Long EXPECTED_CONN_COUNT = 5L;

  public static ExecutorService executor = Executors.newCachedThreadPool();
  public static BlockingQueue<String> queue = new LinkedBlockingQueue<>();

  public static AtomicBoolean done = new AtomicBoolean(false);

  public static void main(String[] args) throws Exception {

    // add a "server" thread
    executor.submit(() -> server());

    // add 5 "client" threads
    for (int i = 0; i < EXPECTED_CONN_COUNT; i++) {
      executor.submit(() -> client());
    }

    // clean shut down
    Thread.sleep(TimeUnit.SECONDS.toMillis(1));
    done.set(true);
    Thread.sleep(TimeUnit.SECONDS.toMillis(1));
    executor.shutdown();
    executor.awaitTermination(1, TimeUnit.SECONDS);

  }

  public static void server() {

    print("Server started up");
    // track # of client connections established
    AtomicLong connectionCount = new AtomicLong(0L);

    // at startup, create my "hook"
    CompletableFuture<Long> hook = new CompletableFuture<>();
    hook.thenAcceptAsync(Main::allClientsConnected, executor);

    // consume messages
    while (!done.get()) {
      try {
        String msg = queue.poll(5, TimeUnit.MILLISECONDS);
        if (null != msg) {
          print("Server received client message");
          if (CONNETED_MSG.equals(msg)) {
            long count = connectionCount.incrementAndGet();

            if (count >= EXPECTED_CONN_COUNT) {
              hook.complete(count);
            }
          }
        }

      } catch (Exception e) {
        e.printStackTrace();
      }
    }

    print("Server shut down");

  }

  public static void client() {
    queue.offer(CONNETED_MSG);
    print("Client sent message");
  }

  public static void allClientsConnected(Long count) {
    print("All clients connected, count: " + count);
  }


  public static void print(String msg) {
    System.out.println(String.format("[%s] %s", Thread.currentThread().getName(), msg));
  }
}

You get output like this

[pool-1-thread-1] Server started up
[pool-1-thread-5] Client sent message
[pool-1-thread-3] Client sent message
[pool-1-thread-2] Client sent message
[pool-1-thread-6] Client sent message
[pool-1-thread-4] Client sent message
[pool-1-thread-1] Server received client message
[pool-1-thread-1] Server received client message
[pool-1-thread-1] Server received client message
[pool-1-thread-1] Server received client message
[pool-1-thread-1] Server received client message
[pool-1-thread-4] All clients connected, count: 5
[pool-1-thread-1] Server shut down
Alex
  • 2,435
  • 17
  • 18
  • I do not know the number of client connections in advance. There can be many connections and these connections have different streams. I have to wait on a particular stream. – user340 Jan 21 '15 at 15:22
  • @kani, this is a contrived example, you could use this kind of pattern with any kind of logic (the same biz logic you have now) and then use `CompletableFuture.complete(..)`, I'm just trying to show a newer paradigm that Java introduced to make composition of complex async behavior more fluent then reasoning around `Object.wait()` and `Object.notify()` – Alex Jan 21 '15 at 15:26
  • My client threads are grouped based on a communication ID. e.g., if there are 15 client connections 10 belong to one communication A and 5 belong to communication B. I must wait on 10 threads for A and 5 threads for B. Is it possible with CompletableFuture? I see it requires the threads to be collected with the executor and I am unable to differentiate the communication stream when starting the thread. – user340 Jan 22 '15 at 15:03
2

Your expression:

conhasmap.get(conectionID) != new AtomicLong(0)

will always be true because you are comparing the object references, which will never be equal, instead of the values. The better expression would be:

conhasmap.get(conectionID).longValue() != 0L)

, but looping like this without wait/notify logic within the loop is not a good practice because it uses CPU time constantly. Instead, each thread should call .wait() on the AtomicLong instance, and when it is decremented or incremented, you should call .notifyAll() on the AtomicLong instance to wake up each waiting thread to check the expression. The AtomicLong class may already be calling the notifyAll() method whenever it is modified, but I don't know for sure.

AtomicLong al = conhasmap.get(conectionID);
synchronized(al) {
    while(al.longValue() != 0L) {
        al.wait(100); //wait up to 100 millis to be notified
    }
}

In the code that increments/decrements, it will look like:

AtomicLong al = conhasmap.get(conectionID);
synchronized(al) {
    if(al.decrementAndGet() == 0L) {
        al.notifyAll();
    }
}

I personally would not use an AtomicLong for this counter because you are not benefiting from the lock-less behavior of the AtomicLong. Just use a java.lang.Long instead because you will need to synchronize on the counter object for the wait()/notify() logic anyway.

Palamino
  • 791
  • 3
  • 10
  • Thanks for pointing out the object reference comparison. However, the solution is still not working. – user340 Jan 21 '15 at 15:13
  • Do you think the al lock is consistent as the lock? The code goes into an infinite loop in the while loop – user340 Jan 21 '15 at 17:40
  • You may have a situation that is decrementing the counter too often. Change the while condition to be: while(al.longValue() > 0L) And the if condition to be: if(al.decrementAndGet() <= 0L) – Palamino Jan 21 '15 at 19:14
  • Also check to make sure you only instantiate the AtomicLong once for the connection stream. Each connection needs to increment/decrement the same AtomicLong instance otherwise they will be locking on difference object instances which would lead to infinite looping like you are seeing. – Palamino Jan 21 '15 at 19:28
  • The > & <= operates works too. However both methods work for only three threads. More than three program goes to an infinite loop in the while.What can be the issue? Further I used the conectionID as the lock for synchronization and waiting – user340 Jan 21 '15 at 23:19
  • If conectionID is defined as a java.lang.Integer or java.lang.Long, you may have some code that is auto-unboxing the value into a native int or long value that is later autoboxed back into a different java.lang.Integer or java.lang.Long instance. – Palamino Jan 22 '15 at 13:49