7

I have implemented a Kafka consumer with KafkaHandler. My consumer is supposed to consume events, then send a REST request to some other service for each event. I want to retry only if that REST service is down. Otherwise, I can ignore the failed event.

My container factory is configured as below:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent>
  kafkaListenerContainerFactory() {

  ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent> factory =
    new ConcurrentKafkaListenerContainerFactory<>();

  factory.setConsumerFactory(consumerFactory());
  factory.setStatefulRetry(true);
  factory.setRetryTemplate(retryTemplate());
  factory.setConcurrency(3);

  ContainerProperties containerProperties = factory.getContainerProperties();
  containerProperties.setAckOnError(false);
  containerProperties.setAckMode(AckMode.RECORD);
  containerProperties.setErrorHandler(new SeekToCurrentErrorHandler());

  return factory;
}

I am using ExceptionClassifierRetryPolicy for setting the exceptions and the corresponding retry policies.

Everything looks good with retrying. It retries when I get a ConnectException and it ignores when I get an IllegalArgumentException.

However, in IllegalArgumentException scenario, SeekToCurrentErrorHandler seeks back to the unprocessed offset (because it seeks back for unprocessed messages including the failed one) which ends up with an immediate retry of the failed message. Consumer constantly goes back and forth and retries million times.

If I had the chance to understand which record has failed in SeekToCurrentErrorHandler, then I would create a custom implementation of SeekToCurrentErrorHandler to check if the failed message is retryable or not (by using the thrownException field). If it is not retryable, then I would remove it from the list of records to seek back.

Any ideas about how to achieve this functionality?

Note: enable.auto.commit is set to false, auto.offset.reset is set to earliest.

Thank you!

Ercument Kisa
  • 168
  • 1
  • 3
  • 12
  • 1
    Why you set the auto.offset.reset to earliest? I think that it should be set to latest on PROD env. – Dina Bogdan May 14 '19 at 07:59
  • I have only 1 consumer group, so afaik it does not matter. You can check [this](https://stackoverflow.com/a/32392174/1868656) for a better explanation. – Ercument Kisa May 14 '19 at 20:44

1 Answers1

4

There is a FailedRecordTracker since Spring for Apache Kafka 2.2 (not released yet):

https://docs.spring.io/spring-kafka/docs/2.2.0.M2/reference/html/whats-new-part.html#_listener_container_changes

Starting with version 2.2, the SeekToCurrentErrorHandler can now recover (skip) a record that keeps failing. By default, after 10 failures, the failed record will be logged (ERROR). You can configure the handler with a custom recoverer (BiConsumer) and/or max failures.

SeekToCurrentErrorHandler errorHandler =
    new SeekToCurrentErrorHandler((record, exception) -> {
          // recover after 3 failures - e.g. send to a dead-letter topic
          }, 3);

So, what you need is just copy/paste a FailedRecordTracker and SeekToCurrentErrorHandler source code from the master into your project and you will have a functionality you are seeking for:

https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java

https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java

mareck_ste
  • 517
  • 1
  • 6
  • 17
Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • Thank you @ArtemBilan, this looks promising for non-retryable errors. I can set the max retry count to 1 and log the failure (or send it to a DLT) if the exception is one of the non-retryable exceptions. How about the retryable exceptions? I want them to be retried forever, so I don't want to set a max failure limit for them. – Ercument Kisa Sep 12 '18 at 22:31
  • The number of consumers is equal to the number of partitions in my case. So, for now, I assumed the first record of the `records` list will have the smallest offset of that topic. I am removing it from the list of records if the exception is non-retryable. – Ercument Kisa Sep 12 '18 at 22:32
  • Looks like there is a bug for spring kafka, if the concurrency level is less than the number of partitions, the consumer will retry the message forever. – ttt Sep 11 '19 at 12:03
  • Looks like you talk about this: https://stackoverflow.com/questions/57889424/spring-kafka-seektocurrenterrorhandler-maxfailures-doesnt-work-when-concurrency – Artem Bilan Sep 11 '19 at 13:17
  • Removing offset doesn't bring you any value: the next successful higher number is still going to commit everything into offset store. You may consider to configure a `recoveryCallback` for the `ConcurrentKafkaListenerContainerFactory` to do some logic after exhaust. – Artem Bilan Sep 11 '19 at 13:20