I am consuming batches in kafka, where retry is not supported in spring cloud stream kafka binder with batch mode, there is an option given that You can configure a SeekToCurrentBatchErrorHandler (using a ListenerContainerCustomizer) to achieve similar functionality to retry in the binder.
I tried the same, but with SeekToCurrentBatchErrorHandler, but it's retrying more than the time set which is 3 times.
How can I do that? I would like to retry the whole batch.
How can I send the whole batch to dlq topic? like for record listener I used to match deliveryAttempt(retry) to 3 then send to DLQ topic, check in listener.
I have checked this link, which is exactly my issue but an example would be great help, with this library spring-cloud-stream-kafka-binder, can I achieve that. Please explain with an example, I am new to this.
Currently I have below code.
@Configuration
public class ConsumerConfig {
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
return (container, dest, group) -> {
container.getContainerProperties().setAckOnError(false);
SeekToCurrentBatchErrorHandler seekToCurrentBatchErrorHandler
= new SeekToCurrentBatchErrorHandler();
seekToCurrentBatchErrorHandler.setBackOff(new FixedBackOff(0L, 2L));
container.setBatchErrorHandler(seekToCurrentBatchErrorHandler);
//container.setBatchErrorHandler(new BatchLoggingErrorHandler());
};
}
}
Listerner:
@StreamListener(ActivityChannel.INPUT_CHANNEL)
public void handleActivity(List<Message<Event>> messages,
@Header(name = KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment
acknowledgment,
@Header(name = "deliveryAttempt", defaultValue = "1") int
deliveryAttempt) {
try {
log.info("Received activity message with message length {}", messages.size());
nodeConfigActivityBatchProcessor.processNodeConfigActivity(messages);
acknowledgment.acknowledge();
log.debug("Processed activity message {} successfully!!", messages.size());
} catch (MessagePublishException e) {
if (deliveryAttempt == 3) {
log.error(
String.format("Exception occurred, sending the message=%s to DLQ due to: ",
"message"),
e);
publisher.publishToDlq(EventType.UPDATE_FAILED, "message", e.getMessage());
} else {
throw e;
}
}
}
After seeing @Gary's response added the ListenerContainerCustomizer @Bean with RetryingBatchErrorHandler, but not able to import the class. attaching screenshots.