I am using below class to send data to our messaging queue by using socket either in a synchronous way or asynchronous way as shown below.
sendAsync
- It sends data asynchronously without any timeout. After sending(on LINE A)
it adds toretryHolder
bucket so that if acknowledgement is not received then it will retry again from the background thread which is started in a constructor.send
- It internally callssendAsync
method and then sleep for a particular timeout period and if acknowledgement is not received then it removes fromretryHolder
bucket so that we don't retry again.
So the only difference between those two above methods is - For async I need to retry at all cost but for sync I don't need to retry but looks like it might be getting retried since we share the same retry bucket cache and retry thread runs every 1 second.
ResponsePoller
is a class which receives the acknowledgement for the data that was sent to our messaging queue and then calls removeFromretryHolder
method below to remove the address so that we don't retry after receiving the acknowledgement.
public class SendToQueue {
private final ExecutorService cleanupExecutor = Executors.newFixedThreadPool(5);
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
private final Cache<Long, byte[]> retryHolder =
CacheBuilder
.newBuilder()
.maximumSize(1000000)
.concurrencyLevel(100)
.removalListener(
RemovalListeners.asynchronous(new LoggingRemovalListener(), cleanupExecutor)).build();
private static class Holder {
private static final SendToQueue INSTANCE = new SendToQueue();
}
public static SendToQueue getInstance() {
return Holder.INSTANCE;
}
private SendToQueue() {
executorService.submit(new ResponsePoller()); // another thread which receives acknowledgement and then delete entry from the `retryHolder` cache accordingly.
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// retry again
for (Entry<Long, byte[]> entry : retryHolder.asMap().entrySet()) {
sendAsync(entry.getKey(), entry.getValue());
}
}
}, 0, 1, TimeUnit.SECONDS);
}
public boolean sendAsync(final long address, final byte[] encodedRecords, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(encodedRecords);
// send data on a socket LINE A
boolean sent = msg.send(socket);
msg.destroy();
retryHolder.put(address, encodedRecords);
return sent;
}
public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
boolean sent = sendAsync(address, encodedRecords, socket);
// if the record was sent successfully, then only sleep for timeout period
if (sent) {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
// if key is not present, then acknowledgement was received successfully
sent = !retryHolder.asMap().containsKey(address);
// and key is still present in the cache, then it means acknowledgment was not received after
// waiting for timeout period, so we will remove it from cache.
if (!sent)
removeFromretryHolder(address);
return sent;
}
public void removeFromretryHolder(final long address) {
retryHolder.invalidate(address);
}
}
What is the best way by which we dont retry if anyone is calling send
method but we still need to know whether acknowledgement was received or not. Only thing is I dont need to retry at all.
Do we need separate bucket for all the sync calls just for acknowledgement and we dont retry from that bucket?