10

I am using below class to send data to our messaging queue by using socket either in a synchronous way or asynchronous way as shown below.

  • sendAsync - It sends data asynchronously without any timeout. After sending (on LINE A) it adds to retryHolder bucket so that if acknowledgement is not received then it will retry again from the background thread which is started in a constructor.
  • send - It internally calls sendAsync method and then sleep for a particular timeout period and if acknowledgement is not received then it removes from retryHolder bucket so that we don't retry again.

So the only difference between those two above methods is - For async I need to retry at all cost but for sync I don't need to retry but looks like it might be getting retried since we share the same retry bucket cache and retry thread runs every 1 second.

ResponsePoller is a class which receives the acknowledgement for the data that was sent to our messaging queue and then calls removeFromretryHolder method below to remove the address so that we don't retry after receiving the acknowledgement.

public class SendToQueue {
  private final ExecutorService cleanupExecutor = Executors.newFixedThreadPool(5);
  private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
  private final Cache<Long, byte[]> retryHolder =
      CacheBuilder
          .newBuilder()
          .maximumSize(1000000)
          .concurrencyLevel(100)
          .removalListener(
              RemovalListeners.asynchronous(new LoggingRemovalListener(), cleanupExecutor)).build();

  private static class Holder {
    private static final SendToQueue INSTANCE = new SendToQueue();
  }

  public static SendToQueue getInstance() {
    return Holder.INSTANCE;
  }

  private SendToQueue() {
    executorService.submit(new ResponsePoller()); // another thread which receives acknowledgement and then delete entry from the `retryHolder` cache accordingly.
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        // retry again
        for (Entry<Long, byte[]> entry : retryHolder.asMap().entrySet()) {
          sendAsync(entry.getKey(), entry.getValue());
        }
      }
    }, 0, 1, TimeUnit.SECONDS);
  }

  public boolean sendAsync(final long address, final byte[] encodedRecords, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(encodedRecords);
    // send data on a socket LINE A
    boolean sent = msg.send(socket);
    msg.destroy();
    retryHolder.put(address, encodedRecords);
    return sent;
  }

  public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
    boolean sent = sendAsync(address, encodedRecords, socket);
    // if the record was sent successfully, then only sleep for timeout period
    if (sent) {
      try {
        TimeUnit.MILLISECONDS.sleep(500);
      } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
      }
    }
    // if key is not present, then acknowledgement was received successfully
    sent = !retryHolder.asMap().containsKey(address);
    // and key is still present in the cache, then it means acknowledgment was not received after
    // waiting for timeout period, so we will remove it from cache.
    if (!sent)
      removeFromretryHolder(address);
    return sent;
  }

  public void removeFromretryHolder(final long address) {
    retryHolder.invalidate(address);
  }
}

What is the best way by which we dont retry if anyone is calling send method but we still need to know whether acknowledgement was received or not. Only thing is I dont need to retry at all.

Do we need separate bucket for all the sync calls just for acknowledgement and we dont retry from that bucket?

john
  • 11,311
  • 40
  • 131
  • 251

2 Answers2

2

The code has a number of potential issues:

  • An answer may be received before the call to retryHolder#put.
  • Possibly there is a race condition when messages are retried too.
  • If two messages are sent to the same address the second overwrites the first?
  • Send always wastes time with a sleep, use a wait+notify instead.

I would store a class with more state instead. It could contain a flag (retryIfNoAnswer yes/no) that the retry handler could check. It could provide waitForAnswer/markAnswerReceived methods using wait/notify so that send doesn't have to sleep for a fixed time. The waitForAnswer method can return true if an answer was obtained and false on timeout. Put the object in the retry handler before sending and use a timestamp so that only messages older than a certain age are retried. That fixes the first race condition.

EDIT: updated example code below, compiles with your code, not tested:

public class SendToQueue {
private final ExecutorService cleanupExecutor = Executors.newFixedThreadPool(5);
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);

// Not sure why you are using a cache rather than a standard ConcurrentHashMap?
private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
    .concurrencyLevel(100)
    .removalListener(RemovalListeners.asynchronous(new LoggingRemovalListener(), cleanupExecutor)).build();

private static class PendingMessage {
    private final long _address;
    private final byte[] _encodedRecords;
    private final Socket _socket;
    private final boolean _retryEnabled;
    private final Object _monitor = new Object();
    private long _sendTimeMillis;
    private volatile boolean _acknowledged;

    public PendingMessage(long address, byte[] encodedRecords, Socket socket, boolean retryEnabled) {
        _address = address;
        _sendTimeMillis = System.currentTimeMillis();
        _encodedRecords = encodedRecords;
        _socket = socket;
        _retryEnabled = retryEnabled;
    }

    public synchronized boolean hasExpired() {
        return System.currentTimeMillis() - _sendTimeMillis > 500L;
    }

    public synchronized void markResent() {
        _sendTimeMillis = System.currentTimeMillis();
    }

    public boolean shouldRetry() {
        return _retryEnabled && !_acknowledged;
    }

    public boolean waitForAck() {
        try {
            synchronized(_monitor) {
                _monitor.wait(500L);
            }
            return _acknowledged;
        }
        catch (InterruptedException e) {
            return false;
        }
    }

    public void ackReceived() {
        _acknowledged = true;
        synchronized(_monitor) {
            _monitor.notifyAll();
        }
    }

    public long getAddress() {
        return _address;
    }

    public byte[] getEncodedRecords() {
        return _encodedRecords;
    }

    public Socket getSocket() {
        return _socket;
    }
}

private static class Holder {
    private static final SendToQueue INSTANCE = new SendToQueue();
}

public static SendToQueue getInstance() {
    return Holder.INSTANCE;
}

private void handleRetries() {
    List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
    for (PendingMessage m : messages) {
        if (m.hasExpired()) {
            if (m.shouldRetry()) {
                m.markResent();
                doSendAsync(m, m.getSocket());
            }
            else {
                // Or leave the message and let send remove it
                cache.invalidate(m.getAddress());
            }
        }
    }
}

private SendToQueue() {
    executorService.submit(new ResponsePoller()); // another thread which receives acknowledgement and then delete entry from the cache accordingly.
    executorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            handleRetries();
        }
    }, 0, 1, TimeUnit.SECONDS);
}

public boolean sendAsync(final long address, final byte[] encodedRecords, final Socket socket) {
    PendingMessage m = new PendingMessage(address, encodedRecords, socket, true);
    cache.put(address, m);
    return doSendAsync(m, socket);
}

private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
        // send data on a socket LINE A
        return msg.send(socket);
    }
    finally {
        msg.destroy();
    }
}

public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
    PendingMessage m = new PendingMessage(address, encodedRecords, socket, false);
    cache.put(address, m);
    try {
        if (doSendAsync(m, socket)) {
            return m.waitForAck();
        }
        return false;
    }
    finally {
        // Alternatively (checks that address points to m):
        // cache.asMap().remove(address, m);
        cache.invalidate(address);
    }
}

public void handleAckReceived(final long address) {
    PendingMessage m = cache.getIfPresent(address);
    if (m != null) {
        m.ackReceived();
        cache.invalidate(address);
    }
}
}

And called from ResponsePoller:

SendToQueue.getInstance().handleAckReceived(addressFrom);
ewramner
  • 5,810
  • 2
  • 17
  • 33
  • In my case point third won't happen at all as the key will always be different. So basis on my code can you provide an example how will this work with your suggestions so that I can understand better? – john Nov 20 '17 at 20:40
  • I have few questions on your edited answer. I am in this [chat room](https://chat.stackoverflow.com/rooms/159554/sendandretry). Let's discuss there. – john Nov 22 '17 at 07:41
  • If you didn't notice I updated the code in the solution Yesterday. – ewramner Nov 23 '17 at 20:31
  • I just noticed your edit (as I was out for sometime) and I am working on trying to understand your changes. In the meantime, can you help me understand what is the benefit I am getting with `wait+notify` instead of using sleep directly? I saw another answer below where one guy said don't use `wait + notify` here so I am just trying to understand what is the benefit we are getting and why he is saying don't use it. I may have follow up question once I go through your changes. – john Nov 28 '17 at 19:13
  • The other guy wants to use a non-blocking solution. That's fine, but a significant rewrite. I tried to stay as close to your code as possible. If you sleep 500ms you will always sleep 500ms (unless interrupted). If you wait 500ms you will wait AT MOST 500ms, but if you get an answer after 10ms the notify call cancels the wait and you waited only 10ms. Much better. – ewramner Nov 29 '17 at 07:56
  • Sorry for late reply. Thanks got it now and it makes sense now. I am in the same [chat room](https://chat.stackoverflow.com/rooms/159554/sendandretry), just need to clarify one last thing. – john Dec 05 '17 at 21:25
  • Yes, from time to time. Can you ask your question here? That way others can also benefit from the answer. – ewramner Dec 07 '17 at 11:33
  • Sure.. Actually my question was very long so I thought it would be better if I ask in chat. Otherwise comment will be filled up with my huge texts but I will make sure next time. So yeah I saw your comment and yes that answers my question. Thanks for all your help! – john Dec 07 '17 at 18:43
  • I have another [question](https://stackoverflow.com/questions/47107331/how-to-make-sure-that-i-am-not-sharing-same-socket-between-two-threads-at-a-same) in which I don't want to share same socket between two threads at a same time. It uses same above class which you helped me fix all the race conditions and there is one other class. I think we need to make some sort of socket pool where we can use socket from it and put it back when done using it but I am not able to figure out yet what's the best way to accomplish this. – john Dec 07 '17 at 19:21
  • There are few answers on that question but I am not sure what's the best way to do this. Wanted to see if you can help out since you have got the main context on what I am doing. – john Dec 07 '17 at 19:21
  • The trick with your other question is that it is not enough to synchronize the send, you need to handle replies as well and you don't want to block until a reply has been received. That means you have to wrap your messages in something so that you can see what you get back: a reply to a ping or a reply to a "business message". I don't have time to deep-dive and there are several people who have tried already, but my advise would be to see if you can find an existing solution. After all ZMQ is fairly common and your problem is outlined here: http://zeromq.org/deleted:topics:heartbeating. – ewramner Dec 08 '17 at 10:03
  • Yes I have two things - a `ping` to check whether a socket is alive or not and sending a business message on a socket which means I can get reply for those two things only - reply to a ping or reply to a business message. We are using `ping-pong` model only but we don't have a way to determine whether I am receiving reply for a ping or a business message and I don't think we can change that design now since it is being used from a very long time. – john Dec 11 '17 at 22:10
  • So considering this in mind is there any other way by which we can solve this efficiently so that I don't share same sockets between two threads at a same time? I have already checked the other answers and I can't seem to figure out which one is the best way I should go for and I know you said you don't have time as of now but see if you can help out whenever you get time. If needed I can share with you my `ResponsePoller` class which recives the reply (acknowledgement) from the zeromq queue. – john Dec 11 '17 at 22:10
  • hey quick question, do I need to worry about `IllegalMonitorStateException` cases here since we are working with `wait+notify` here? I was reading more on this and figured I ask you this quesiton. – john Dec 19 '17 at 23:02
  • As long as we have a synchronized lock on the object we are waiting/notifying things are fine. But you are right, the synch should be on the monitor. See updated code. – ewramner Dec 20 '17 at 12:40
  • ok.. why are we removing `synchronized` keyword from `shouldRetry` method and adding it in `markResent` method? Also what is the use of making `_acknowledged` volatile? – john Dec 20 '17 at 19:04
  • The markResent method changes _sendTimeMillis which is read by another thread in hasExpired, so it needs to be synchronized. The shouldRetry method reads a final field and _acknowledged, so if _acknowledges is volatile it does not need to be synchronized. I made it volatile as it is used in ackReceived and waitForAck and I don't want them to synchronize both on the instance and on the monitor and I don't want to synchronized on the monitor in shouldRetry. – ewramner Dec 21 '17 at 10:57
  • That makes sense. Thanks. Appreciated your help! – john Dec 24 '17 at 04:24
1

Design-wise: I feel like you are trying to write a thread-safe and somewhat efficient NIO message sender/receiver but (both) code I see here aren't OK and won't be without significant changes. The best thing to do is either:

  • make full use of the 0MQ framework. I see things and expectations here that are actually available out-of-the-box in ZMQ and java.util.concurrent API.
  • or have a look at Netty (https://netty.io/index.html) preferably if it applies to your project. "Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients." This will save you time if your project gets complex, otherwise it might be overkill to start with (but then expect issues ...).

However if you think you are almost at it with your code or @john's code then I will just give advices to complete:

  • don't use wait() and notify(). Don't sleep() either.
  • use a single thread for your "flow tracker" (i.e. ~the pending message Cache).

You don't actually need 3 threads to process pending messages except if this processing itself is slow (or does heavy stuff) which is not the case here as you basically make an async call (as far as it is really async.. is it?).

The same for the reverse path: use an executor service (multiple threads) for your received packets processing only if the actual processing is slow/blocking or heavy.

I'm not an expert in 0MQ at all but as far as socket.send(...) is thread-safe and non-blocking (which I'm not sure personally - tell me) the above advices shall be correct and make things simpler.

That said, to strictly answer your question:

Do we need separate bucket for all the sync calls just for acknowledgement and we dont retry from that bucket?

I'd say no, hence what do you think of the following? Based on your code and independently of my own feelings this seems acceptable:

public class SendToQueue {
    // ...
    private final Map<Long, Boolean> transactions = new ConcurrentHashMap<>();
    // ...

    private void startTransaction(long address) {
        this.transactions.put(address, Boolean.FALSE);
    }

    public void updateTransaction(long address) {
        Boolean state = this.transactions.get(address);
        if (state != null) {    
            this.transactions.put(address, Boolean.TRUE);
        }
    }

    private void clearTransaction(long address) {
        this.transactions.remove(address);
    }

    public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
        boolean success = false;

        // If address is enough randomized or atomically counted (then ok for parallel send())
        startTransaction(address);
        try {
            boolean sent = sendAsync(address, encodedRecords, socket);
            // if the record was sent successfully, then only sleep for timeout period
            if (sent) {
                // wait for acknowledgement
                success = waitDoneUntil(new DoneCondition() {
                    @Override
                    public boolean isDone() {
                        return SendToQueue.this.transactions.get(address); // no NPE
                    }
                }, 500, TimeUnit.MILLISECONDS);

                if (success) {
                    // Message acknowledged!
                }
            }
        } finally {
            clearTransaction(address);
        }
        return success;
    }

    public static interface DoneCondition {
        public boolean isDone();
    }

    /**
     * WaitDoneUntil(Future f, int duration, TimeUnit unit). Note: includes a
     * sleep(50).
     *
     * @param f Will block for this future done until maxWaitMillis
     * @param waitTime Duration expressed in (time) unit.
     * @param unit Time unit.
     * @return DoneCondition finally met or not
     */
    public static boolean waitDoneUntil(DoneCondition f, int waitTime, TimeUnit unit) {
        long curMillis = 0;
        long maxWaitMillis = unit.toMillis(waitTime);

        while (!f.isDone() && curMillis < maxWaitMillis) {
            try {
                Thread.sleep(50);   // define your step here accordingly or set as parameter
            } catch (InterruptedException ex1) {
                //logger.debug("waitDoneUntil() interrupted.");
                break;
            }
            curMillis += 50L;
        }

        return f.isDone();
    }
    //...
}

public class ResponsePoller {
    //...

    public void onReceive(long address) {   // sample prototype     
        // ...
        SendToQueue.getInstance().updateTransaction(address);
        // The interested sender will know that its transaction is complete.
        // While subsequent (late) calls will have no effect.
    }
}
bsaverino
  • 1,221
  • 9
  • 14