I have one Solace EMS queue, it has one consumer (Spring boot app).. message received on the queue is encoded(avro schema) and the spring boot app will consume the message only if message decoding is successful. But sometimes a message comes with a different encoding(avro mismatch) and the consumer isn't able to consume it(Keep getting ArrayIndexOutOfBoundexception). Then even if we drain the incorrect(differently encoded) message from the queue (manually) and send the correct message, the spring boot app isn't able to consume it.. until it's restarted.
I am implementing JmsListenerConfigurer in order to register JMS listener endpoints dynamically.
code snippet for configure jmsListener:
@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
for (String queue : inQueues) {
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setId("myJmsEndpoint-" + queue);
endpoint.setDestination(queue);
endpoint.setMessageListener(message -> {
if (message instanceof BytesMessage) {
BytesMessage bytemsg = (BytesMessage) message;
byte[] receivedMsg=null;
try {
logger.info("DCR BytesMessage Received with JMS CorrelationId " + bytemsg.getJMSCorrelationID());
receivedMsg= new byte[(int) bytemsg.getBodyLength()];
bytemsg.readBytes(receivedMsg);
} catch (JMSException ex) {
logger.error("error in reading byteMessage ",ex);
}
try (SeekableByteArrayInput inputStream = new SeekableByteArrayInput(receivedMsg)) {
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
List<Customer> recordList = new ArrayList<>();
while (!decoder.isEnd()) {
schema=new Schema.Parser().parse(ConsumerConfiguration.class.getResourceAsStream("/avro/input/customer.avsc"));
datumReader = new SpecificDatumReader<>(schema);
Customer avroRecordRead = datumReader.read(null, decoder);
recordList.add(avroRecordRead);
}
logger.debug("Customer received : " + recordList.toString());
generalTopicWriter.sendAvroCustomerBinaryMessagetoTopic(recordList,queuetoTopicMap.get(queue));
} catch (IOException e) {
logger.error("IOException in datumReader", e);
}
} else {
logger.warn("Can't read message from ebx because it is not in binary format");
}
});
}
registrar.registerEndpoint(endpoint);
logger.info("registered the endpoint for queue: " + queue);
}
}
Keep Getting this error:
28-10-2021 09:27:11.104 [DefaultMessageListenerContainer-1] INFO c.n.e.c.t.s.r.ConsumerConfiguration.lambda$configureJmsListeners$0 - DCR BytesMessage Received with JMS CorrelationId 20211027130939468-1
28-10-2021 09:27:11.105 [Context_4_ReactorThread] INFO c.s.jcsmp.impl.flow.FlowHandleImpl.info - Client-3:session-1:Flow-19: Subscriber received out-of-order message [MsgId=19439126 PrevId=19418347], expected [PrevId <= 0] on Flow (FlowID:19, lastInOrderTpMsg:0, numUnackedTpMsgs:0, Binding:'Q_EBX_DCR_EXP_DEV05_ANZ_SRC'), ignoring.
28-10-2021 09:27:11.105 [Context_4_ReactorThread] INFO c.s.jcsmp.impl.flow.FlowHandleImpl.info - Client-3:session-1:Flow-19: Subscriber received out-of-order message [MsgId=19439675 PrevId=19439126], expected [PrevId <= 0] on Flow (FlowID:19, lastInOrderTpMsg:0, numUnackedTpMsgs:0, Binding:'Q_EBX_DCR_EXP_DEV05_ANZ_SRC'), ignoring.
28-10-2021 09:27:11.107 [DefaultMessageListenerContainer-1] WARN o.s.j.l.DefaultMessageListenerContainer.invokeErrorHandler - Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.ArrayIndexOutOfBoundsException: null