3

I having a problem with the RabbitMQ Work Queue implementation. im current running it in Tomcat, and i have the following class constantly listerning to new task in the queue. But after a day or two, sudden it behaving strangely, where by the object DeliveryOK return by channel.queueDeclare(taskQueueName, isDurable, false, false, null); is always zero. (i print out this in the log below mentioning "Current poolSize").

But in Rabbit admin (./rabbitmqadmin list queues or the RabbitMq Admin portal) it always return a number greater than zero (say 1267 messages in the queue). And it will not reduce to zero until i restart the tomcat, the class below only able to detect that there are actually some messages in the queue.

Initially i thought that this class was terminated somehow, but it is able to consume those messages that newly arrive. It will not consume those 1267 messages that is left hanging inside the queue. For example messages 1267 in the queue, will not be consume until i restart tomcat.

From the code below, is it because buggy implementation or is there a better way to implement a queue consumer specifically for RabbitMQ? i have read a related stack post(Producer/Consumer threads using a Queue), but im not sure if it helps.

Also, is it true that this consumer implementation below will not survive a RunTimeException?

MqConsumer Class:

@Service
public class MqConsumer implements Runnable{

private static final Logger logger = LoggerFactory.getLogger(MqConsumer.class);
private final int MAX_ALERT_THRESHOLD = 10000;

@Autowired
private AsynchSystemConnections asynchSystemConnections;
public MqConsumer(){

}

@PostConstruct
private void init() {
    (new Thread(new MqConsumer(asynchSystemConnections))).start();
}

public MqConsumer(AsynchSystemConnections asynchSystemConnections){
    this.asynchSystemConnections = asynchSystemConnections;
}

@Override
public void run() {
    logger.info("Execute Consumer instance...");

    while (true) { // infinite loop until it die due server restart
        boolean toSleep = consume(asynchSystemConnections);

        if (toSleep){
            logger.error("Sleeping for 1 second...");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                logger.error("", e);
            }
        }
    }
}
private boolean consume(AsynchSystemConnections asynchSystemConnections) {
    com.rabbitmq.client.Connection mqConnection = null;
    Channel mqChannel = null;
    DatasiftMq dMq = null;

    try {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(asynchSystemConnections.getMqServerHost());

        mqConnection = factory.newConnection();
        mqChannel = mqConnection.createChannel();
        //consumePushInteractionJob method will forward to AsynchTwService.consume(connection, channel, AsynchTwService.PUSH_INTERACTION_QUEUE )
        dMq = asynchSystemConnections.getAsynchService().consumePushInteractionJob(mqConnection, mqChannel);


        int poolSize = asynchSystemConnections.getAsynchService().getPushInteractionQueueSize();
        logger.info("Current poolSize: " + poolSize);

    } catch(NullPointerException e) {
        logger.error("", e);
        if (dMq != null) {

            try {
                logger.error("Removing JSON with" + dMq.getLogHeader(dMq));
                asynchSystemConnections.getAsynchService().ack(mqChannel, dMq.getDelivery());
                logger.error("Removed JSON with" + dMq.getLogHeader(dMq));
            } catch (IOException e1) {
                logger.error("Remove JSON Failed: ", e);
            }
        }
        return true;
    } catch (IOException e) {
        logger.error("Unable to create new MQ Connection from factory.", e);
        return true;
    } catch (InterruptedException e) {
        logger.error("", e);
        return true;
    } catch (ClassNotFoundException e) {
        logger.error("", e);
        return true;
    }  catch (Exception e) {
        logger.error("Big problem, better solve this fast!!", e);
        asynchSystemConnections.getNotificationService().notifySystemException(null, e);
        return true;    
    } finally {

        try {
            asynchSystemConnections.getAsynchService().ack(mqChannel, dMq.getDelivery());
            asynchSystemConnections.getAsynchService().disconnect(mqConnection, mqChannel);
        } catch (IOException e) {
            logger.error("", e);
        }
    }

    return false;
}

AsynchTwService Class:

@Service("asynchTwService")
public class AsynchTwService implements AsynchService {
static final String FAVOURITE_COUNT_QUEUE = "favourite_count_queue";
static final String FRIENDS_FOLLOWERS_QUEUE = "friends_followers_queue";
static final String DIRECT_MESSAGE_RECEIVE_QUEUE = "direct_message_receive_queue";
static final String PUSH_INTERACTION_QUEUE = "push_interaction_queue";

private static String mqServerHost;

private static final Logger logger = LoggerFactory.getLogger(AsynchTwService.class);
private static final boolean isDurable = true;
private boolean autoAck = false;

private ConcurrentHashMap<String, Integer> currentQueueSize = new ConcurrentHashMap<String, Integer>();

@Override
public Connection getConnection() throws IOException{
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(mqServerHost); 

    return factory.newConnection();
}

@Override
public void produce(Connection connection, Channel channel, Object object, String taskQueueName) throws IOException {
    sendToQueue(connection, channel, object, taskQueueName);
}

@Override
public QueueItem consume(Connection connection, Channel channel, String taskQueueName) throws IOException, InterruptedException, ClassNotFoundException{
    Serializer serializer = new Serializer();
    try {
        Delivery delivery = listenFromQueue(connection, channel, taskQueueName);
        Object messageObj = serializer.toObject(delivery.getBody());
        QueueItem queueItem = (QueueItem)messageObj;
        queueItem.setDelivery(delivery);
        return queueItem;
    } catch (InterruptedException e) {
        throw e;
    } catch (ClassNotFoundException e) {
        logger.error("Unable to serialize the message to QueueItem object", e);
        throw e;
    }
}

@Override
public int getQueueSize(String taskQueueName){
    return this.currentQueueSize.get(taskQueueName); 
}

private Delivery listenFromQueue(Connection connection, Channel channel, String taskQueueName) throws IOException, InterruptedException, ClassNotFoundException{
    try {
        DeclareOk  ok = channel.queueDeclare(taskQueueName, isDurable, false, false, null);
        currentQueueSize.put(taskQueueName, ok.getMessageCount());
        logger.info("Queue ("+ taskQueueName + ") has items: " +ok.getMessageCount());
    } catch (IOException e) {
        // TODO Auto-generated catch block
    }

    logger.info(" [*] Consuming "+taskQueueName+" message...");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    try {
        channel.basicConsume(taskQueueName, autoAck, consumer);
    } catch (IOException e) {
        logger.error("", e);
    }

    try {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        return delivery;
    } catch (ShutdownSignalException e) {
        logger.error("Unable to retrieve message from Queue", e);
        throw e;
    } catch (ConsumerCancelledException e) {
        logger.error("Unable to retrieve message from Queue", e);
        throw e;
    } catch (InterruptedException e) {
        logger.error("Unable to retrieve message from Queue", e);
        throw e;
    } 
}



private void sendToQueue(Connection connection, Channel channel, Object object, String taskQueueName) throws IOException{
    //Initialization, create Message Queue broker connection
    try{
        channel.queueDeclare(taskQueueName, isDurable, false, false, null);
    }catch(IOException e) {
        logger.error(e.getMessage());
        logger.error("Error create Message Queue connection for queue name:" + taskQueueName, e);
        throw e;
    }

    //send message to broker
    try {
        long start = System.currentTimeMillis();
        Serializer serializer = new Serializer();
        logger.info("Sending Twitter QueueItem to Message Queue...");

        channel.basicPublish("", taskQueueName, MessageProperties.PERSISTENT_TEXT_PLAIN, 
                serializer.toBytes(object)); 

        logger.info("Queue successfully sent, process took: " + (System.currentTimeMillis()-start)+ "ms");
    } catch (IOException e) {
        logger.error("Error while sending object to queue : " + taskQueueName, e);
        throw e;
    }
}

public static String getMqServerHost() {
    return mqServerHost;
}

public static void setMqServerHost(String mqServerHost) {
    AsynchTwService.mqServerHost = mqServerHost;
}

@Override
public void disconnect(Connection connection, Channel channel) throws IOException{
    try {
        if (channel != null){
            if (channel.isOpen()){
                channel.close();    
            }
        }
        if (connection != null){
            if (connection.isOpen()){
                connection.close(); 
            }
        }
        logger.debug("MQ Channel Disconnected");
    } catch (IOException e) {
        throw e;
    }
}

@Override
public void ack(Channel channel, QueueingConsumer.Delivery delivery) throws IOException {
    // this is made as another method call is to avoid Ack too fast un intentionally
    try {
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        logger.info("[x] acked" );
    } catch (IOException e) {
        logger.error("Unable Acknowledge Queue Message", e);
        throw e;
    }
}

@Override
public DatasiftMq consumeDatasiftInteraction(Connection connection, Channel channel, 
        String taskQueueName) throws IOException, InterruptedException, ClassNotFoundException {

    Serializer serializer = new Serializer();
    try {
        Delivery delivery = listenFromQueue(connection, channel, taskQueueName);
        Object messageObj = serializer.toObject(delivery.getBody());
        DatasiftMq dto = (DatasiftMq)messageObj;
        dto.setDelivery(delivery);
        return dto;
    } catch (InterruptedException e) {
        throw e;
    } catch (ClassNotFoundException e) {
        logger.error("Unable to serialize the message to DatasiftDTO object", e);
        throw e;
    }
}

@Override
public void reQueue(Channel channel, Delivery delivery) throws IOException {
    try {
        channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
        logger.info("[x] Nacked" );
    } catch (IOException e) {
        logger.error("Unable Acknowledge Queue Message", e);
        throw e;
    }       
}

}

Community
  • 1
  • 1
Reusable
  • 1,888
  • 7
  • 27
  • 46
  • Is there a reason why you don't use the `spring-amqp` dependency? – Zarathustra Jun 23 '14 at 10:02
  • are you using basicQos in your code? How many consumers are connected to that queue? – old_sound Jun 23 '14 at 11:44
  • @Zarathustra I did not use spring-amqp to begin with. does it solve the issue i mention above? – Reusable Jun 23 '14 at 12:56
  • @old_sound no i did not specify any QOS. basically its a FIFO, just one consumer connecting to that queue. But you did raise a good question, because even through there is only 1 of such object, but sometimes when i view it in ./rabbitmqadmin list queues, i can see more than 1 consumer listed under the same queue name. Could that be a potential root cause? – Reusable Jun 23 '14 at 13:00
  • @Reusable At least it makes things easier, Spring sends the acknowledge by default automatically for example, also it provides a nice architecture for multiple consumer. And could you show us your implementation of `AsynchSystemConnections`? – Zarathustra Jun 23 '14 at 13:00
  • @Zarathustra unless i can find a good source of documentation on spring -amqp, else its hard to implement it. do you know any good source of sample and documentation that is well tested? – Reusable Jun 23 '14 at 13:55
  • @Zarathustra if you have more than one consumer, then that could be the problem. Not setting a basicQos prefetch count, means that RabbitMQ will send as many messages as possible to the consumer. There fore one consumer might be holding the messages in memory, while the other can only process new messages. – old_sound Jun 23 '14 at 14:09
  • @old_sound there should be only one consumer, since the class is annotate with "PostConstruct". I can define the basicQos to 1, but that does not make more sense isn't it? – Reusable Jun 23 '14 at 14:12
  • Oh... i am an idiot, i found the reason why that there are more than 1 consumer being created. When i hot deploy a new war, it unable to kill the existing MqConsumer Thread because it is a while(true) loop, and there isnt any implementation of @PreDestroy. with the new app starting up, a new MyConsumer gets created. that is why there is more than 1 consumers in the list. anyone has any idea how best to change this kind of code consider i am using while (true) loop ? – Reusable Jun 23 '14 at 15:14
  • replace `while(true)` with `while(!Thread.current().isInterrupted()`. And do not only log the error of the `InterruptedException` also call `Thread.current().interrupt()` See http://www.javaspecialists.eu/archive/Issue056.html if you are searching for this – Zarathustra Jun 24 '14 at 09:30
  • And where is the implementation of `consumePushInteractionJob()` ? You should have one `ConnectionFactory`, you have one per thread. – Zarathustra Jun 24 '14 at 09:37

1 Answers1

2

Seems like you are missing some basics here.

Taken from here and some code of mine. Setting up the connection outside of the consumer thread:

//executed once
ConnectionFactory factory = new ConnectionFactory();

factory.setHost("someHost");
factory.setUsername("user");
factory.setPassword("pass");

Connection connection = factory.newConnection();

What you have to do inside your thread:

//Consumer - executed in a Thread
QueueingConsumer consumer = new QueueingConsumer(connection.createChannel());
boolean autoAck = false;
channel.basicConsume("hello", autoAck, consumer);

while (!Thread.current().isInterrupted())) {
  QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  //...      
  channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

In general I do still recommand you check out the spring-amqp library it integrates perfectly.

Zarathustra
  • 2,853
  • 4
  • 33
  • 62
  • yeap! you have made your point clear and this is the problem! the code works, and tomcat restart did manage to wait for this thread to be kill gracefully. Also on the spring-amqp, i manage to look at it and you know what? i am using the spring-rabbit now in the production! I dont even need to care about managing how many connection was created and etc etc. thanks for the recommendation! – Reusable Jun 26 '14 at 03:35
  • Just to let you know: With the spring-rabbitmq dependency comes an older rabbitmq client dependency which does not have all features. If you care for some new features, recoverable connections for example, you should take a look at it: https://www.rabbitmq.com/api-guide.html#recovery There was somewhere a changelog site for the client as well... can't find it now... – Zarathustra Jun 26 '14 at 04:48
  • uh oh.. are you saying that the spring-rabbit does not have the automatic recovery enabled? i dont see that happening previously because i follows the code at the rabbitmq.com api-guide. Please let me know if you find anything about it. note: as of now, the prod environment is so far so good. – Reusable Jun 26 '14 at 08:22
  • My setup is 3 rabbitmq nodes in a cluster with ha-queues, nothing special (NO FREAKING CLUSTERIP) all clients know all 3 rabbitmq ips, if one is not reachable they take another (to stay simple). Not sure but automatic-recovery s default value is false I think. Also check the timeouts in your `ConnectionFactory`. Sicne 3.x there is a big improvement regarding connection management. Currenty spring-rabbit is currently using 3.3.1 but amqp-client s latest version is 3.3.4. – Zarathustra Jun 26 '14 at 08:48