I want to receive messages through a MessageHandler
bound to consumer. When I use a core ClientMessage
as the container the message will never be acked with acknowledge()
, but ok with the individualAcknowledge()
method. It's quite different when using javax.jms.Message
.
The Code:
@Override
public void onMessage(ClientMessage message) {
try {
//acknowledge() method won't work, the message still in the queue
//message.acknowledge();
message.individualAcknowledge();
} catch (ActiveMQException e) {
log.error("message acknowledge error: ", e);
}
}
But when I simply change the parameter to javax.jms.Message
. The ack method worked. I've checked the origin code of both:
ClientMessage:
public void acknowledge(ClientMessage message) throws ActiveMQException {
ClientMessageInternal cmi = (ClientMessageInternal)message;
if (this.ackIndividually) {
this.individualAcknowledge(message);
} else {
this.ackBytes += message.getEncodeSize();
if (logger.isTraceEnabled()) {
logger.trace(this + "::acknowledge ackBytes=" + this.ackBytes + " and ackBatchSize=" + this.ackBatchSize + ", encodeSize=" + message.getEncodeSize());
}
if (this.ackBytes >= this.ackBatchSize) {
if (logger.isTraceEnabled()) {
logger.trace(this + ":: acknowledge acking " + cmi);
}
this.doAck(cmi);
} else {
if (logger.isTraceEnabled()) {
logger.trace(this + ":: acknowledge setting lastAckedMessage = " + cmi);
}
this.lastAckedMessage = cmi;
}
}
}
And the javax.jms.Message
:
public void acknowledge() throws JMSException {
if (this.session != null) {
try {
if (this.session.isClosed()) {
throw ActiveMQClientMessageBundle.BUNDLE.sessionClosed();
}
if (this.individualAck) {
this.message.individualAcknowledge();
}
if (this.clientAck || this.individualAck) {
this.session.commit(this.session.isBlockOnAcknowledge());
}
} catch (ActiveMQException var2) {
throw JMSExceptionHelper.convertFromActiveMQException(var2);
}
}
}
It's quite obviously that javax.jms.Message
commits the session whenever a message is acked, but ClientMessage
will calculate whether the acked message is reach the max size of the ackBatchSize
, if not, it will store this message in a parameter called lastAckedMessage.
So,my question is:
- What does
lastAckedMessage
do when I just have 1 message that is too far too reach the bathSize? How could I ack the message? Close the session to force commit?
The session is configured to autoCommitSends and autoCommitAcks.
After I read the doc, I try to change the acknowledge()
to individualAcknowledge()
and it works, but i'm still wondering why the acknowledge()
can't.