Following are the configurations
@Bean
@Autowired
public ConcurrentKafkaListenerContainerFactory<byte[], byte[]> kafkaListContFactory(@Qualifier("retryTemplate") RetryTemplate retryTemplate, @Qualifier("batchErrorHandler") ErrorHandler errorHandler, @Qualifier("batchErrorHandler") BatchErrorHandler batchErrorHandler) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setErrorHandler(errorHandler);
factory.getContainerProperties().setAckOnError(false);
factory.setStatefulRetry(true);
factory.setRetryTemplate(retryTemplate);
}
**Retry config**
@Bean("retryTemplate")
public RetryTemplate retryTemplate() {
final ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(delay);
backOffPolicy.setMultiplier(multiplier);
backOffPolicy.setMaxInterval(20000);
final RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(new AlwaysRetryPolicy());
template.setBackOffPolicy(backOffPolicy);
return template;
}
SeekToCurrentErrorHandler config
I do not want to recover and try to retry till it succeeds so I have given maxAttempts to -1
@Bean("errorHandler")
public ErrorHandler errorHandler() {
final ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler((r, t) -> {
if (t != null && (t instanceof RetryServiceException || t.getCause() instanceof RetryServiceException)) {
logger.error("SeekToCurrentErrorHandler recoverer failure", t.getMessage());
throw new RetryServiceException("SeekToCurrentErrorHandler recoverer failure");
}
}, -1);
return handler;
}
And finally I am acknowledging in @KafkaListener method when no exception happens.
My question is if I have configured -1 as max attempts and my error handler will take care of retrying do I need retryTemplate? But retry is not happening for infinite times and the problem is If i fetch a batch record I am processing the same messages if one of the message fails in the poll all will be reprocessed.
I need to use batchErrorHandler and implement a exponential backoff strategy so that retry will be stateful and reprocessing of same success messages should be avoided. Could anyone help with the above issue.
And I need to avoid rebalancing of partition of by wrong usage of max.poll.interval.ms