0

I want to receive messages through a MessageHandler bound to consumer. When I use a core ClientMessage as the container the message will never be acked with acknowledge(), but ok with the individualAcknowledge() method. It's quite different when using javax.jms.Message.

The Code:

@Override
public void onMessage(ClientMessage message) {
    try {
        //acknowledge() method won't work, the message still in the queue
        //message.acknowledge();
        message.individualAcknowledge();
    } catch (ActiveMQException e) {
        log.error("message acknowledge error: ", e);
    }
}

But when I simply change the parameter to javax.jms.Message. The ack method worked. I've checked the origin code of both:

ClientMessage:

public void acknowledge(ClientMessage message) throws ActiveMQException {
    ClientMessageInternal cmi = (ClientMessageInternal)message;
    if (this.ackIndividually) {
        this.individualAcknowledge(message);
    } else {
        this.ackBytes += message.getEncodeSize();
        if (logger.isTraceEnabled()) {
            logger.trace(this + "::acknowledge ackBytes=" + this.ackBytes + " and ackBatchSize=" + this.ackBatchSize + ", encodeSize=" + message.getEncodeSize());
        }

        if (this.ackBytes >= this.ackBatchSize) {
            if (logger.isTraceEnabled()) {
                logger.trace(this + ":: acknowledge acking " + cmi);
            }

            this.doAck(cmi);
        } else {
            if (logger.isTraceEnabled()) {
                logger.trace(this + ":: acknowledge setting lastAckedMessage = " + cmi);
            }

            this.lastAckedMessage = cmi;
        }
    }
}

And the javax.jms.Message:

public void acknowledge() throws JMSException {
    if (this.session != null) {
        try {
            if (this.session.isClosed()) {
                throw ActiveMQClientMessageBundle.BUNDLE.sessionClosed();
            }

            if (this.individualAck) {
                this.message.individualAcknowledge();
            }

            if (this.clientAck || this.individualAck) {
                this.session.commit(this.session.isBlockOnAcknowledge());
            }
        } catch (ActiveMQException var2) {
            throw JMSExceptionHelper.convertFromActiveMQException(var2);
        }
    }
}

It's quite obviously that javax.jms.Message commits the session whenever a message is acked, but ClientMessage will calculate whether the acked message is reach the max size of the ackBatchSize, if not, it will store this message in a parameter called lastAckedMessage.

So,my question is:

  1. What does lastAckedMessage do when I just have 1 message that is too far too reach the bathSize? How could I ack the message? Close the session to force commit?

The session is configured to autoCommitSends and autoCommitAcks.

After I read the doc, I try to change the acknowledge() to individualAcknowledge() and it works, but i'm still wondering why the acknowledge() can't.

Justin Bertram
  • 29,372
  • 4
  • 21
  • 43
Grab
  • 25
  • 4

1 Answers1

1

The core API is a relatively low-level API aimed at fine-grained control for maximizing performance. It shares some similarities with the JMS API, but some important differences as well.

Specifically regarding consumers, acknowledgements are costly in terms of performance because they require a round-trip to the broker. Therefore, there are several ways to tune acknowledgements in the core API.

The first tuning step is choosing whether or not to automatically commit acknowledgements. This decision is made when creating the session (e.g. via this method). If acknowledgements are automatically committed then the application never has to invoke commit() itself, but that also means it can't support more advanced use-cases which would require rollback().

If the user chooses to automatically commit acknowledgements, as you have, then the next tuning step is how often these acknowledgements should be committed. This decision is made by specifying the "acknowledgement batch size" (e.g. via this method).

If you want to automatically acknowledge messages and you want the acknowledgements to be committed immediately then set your ackBatchSize to 0. If you want full control over acknowledgements then don't automatically acknowledge the messages and invoke commit() "manually". However, keep in mind that forcing a round-trip to the broker for every acknowledgement will reduce performance (potentially considerably).

Lastly, the lastAckedMessage is tracked so that only one acknowledgement needs to be sent to the broker (i.e. the most recent) and then every message sent before that one is acknowledged as well. This is an optimization which relieves the client from the burden of having to send an individual acknowledgement for every single message it has received.

Justin Bertram
  • 29,372
  • 4
  • 21
  • 43
  • Thanks for your answer, that helps a lot. As what you mentioned, the session will commit the acks when it reach the ackBatchSize, what if an out of schedule collapse happend before it commits? Thoes acks will be lost and the messages will be consumed duplicately? I've only test this situation in an in schedule shutdown and the session will commit the acks when shutdown. – Grab Feb 21 '23 at 02:47
  • After I change my code from jms to core api , it occurs to me that why i'm using jms instead of core api, because once i use core api to receive the message, a java.lang.NegativeArraySizeException will be thrown. After i checked the origin code, i found that when reading the buffer length, it returns a length that is nagetive (e.g. -499384257), at the same time i found your [answer](https://stackoverflow.com/questions/73759294/receiving-error-reading-in-simplestring-length-is-greater-than-readablebyte). – Grab Feb 21 '23 at 12:24
  • I think it's because a lost of content-length in header of a message that i sent without using core api. So, the core api should be Exact Match (both producer and consumer), right? othewise, i could only use readNullableSimpleString() to handle the messages. Sorry i can't comment in that answer, so i have to reply you here. – Grab Feb 21 '23 at 12:27
  • If there are acknowledgements waiting to be sent from your application to the broker and the application crashes then those acknowledgements will not be sent and those messages will be redelivered (assuming they aren't sent to a dead-letter address). Redeliveries like this can happen in many different circumstances with both core and JMS clients so your application should be written to deal with redeliveries. – Justin Bertram Feb 21 '23 at 17:14
  • You can exchange messages between applications using the core and JMS APIs. The content of the message just needs to be encoded in a way both applications can understand. You might want to ask a _new_ question about this as we're getting off-topic from your original question here. – Justin Bertram Feb 21 '23 at 17:23