1

I am consuming batches in kafka, where retry is not supported in spring cloud stream kafka binder with batch mode, there is an option given that You can configure a SeekToCurrentBatchErrorHandler (using a ListenerContainerCustomizer) to achieve similar functionality to retry in the binder.

I tried the same, but with SeekToCurrentBatchErrorHandler, but it's retrying more than the time set which is 3 times.

  1. How can I do that? I would like to retry the whole batch.

  2. How can I send the whole batch to dlq topic? like for record listener I used to match deliveryAttempt(retry) to 3 then send to DLQ topic, check in listener.

I have checked this link, which is exactly my issue but an example would be great help, with this library spring-cloud-stream-kafka-binder, can I achieve that. Please explain with an example, I am new to this.

Currently I have below code.

@Configuration
public class ConsumerConfig {

  @Bean
  public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
return (container, dest, group) -> {
    container.getContainerProperties().setAckOnError(false);
    
    SeekToCurrentBatchErrorHandler seekToCurrentBatchErrorHandler 
    = new SeekToCurrentBatchErrorHandler();
    seekToCurrentBatchErrorHandler.setBackOff(new FixedBackOff(0L, 2L));
    container.setBatchErrorHandler(seekToCurrentBatchErrorHandler);
    //container.setBatchErrorHandler(new BatchLoggingErrorHandler());
   };
 }
}

Listerner:

  @StreamListener(ActivityChannel.INPUT_CHANNEL)
  public void handleActivity(List<Message<Event>> messages,
                         @Header(name = KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment 
                                                                     acknowledgment,
                         @Header(name = "deliveryAttempt", defaultValue = "1") int 
                                                                deliveryAttempt) {
  try {
    log.info("Received activity message with message length {}", messages.size());
    nodeConfigActivityBatchProcessor.processNodeConfigActivity(messages);
    acknowledgment.acknowledge();
    log.debug("Processed activity message {} successfully!!", messages.size());
  } catch (MessagePublishException e) {
    if (deliveryAttempt == 3) {
      log.error(
              String.format("Exception occurred, sending the message=%s to DLQ due to: ",
                      "message"),
              e);
      publisher.publishToDlq(EventType.UPDATE_FAILED, "message", e.getMessage());
    } else {
      throw e;
    }
  }
  }

After seeing @Gary's response added the ListenerContainerCustomizer @Bean with RetryingBatchErrorHandler, but not able to import the class. attaching screenshots.

not able to import RetryingBatchErrorHandler

my spring cloud dependencies

APK
  • 155
  • 1
  • 15

1 Answers1

2

Use a RetryingBatchErrorHandler to send the whole batch to the DLT

https://docs.spring.io/spring-kafka/docs/current/reference/html/#retrying-batch-eh

Use a RecoveringBatchErrorHandler where you can throw a BatchListenerFailedException to tell it which record in the batch failed.

https://docs.spring.io/spring-kafka/docs/current/reference/html/#recovering-batch-eh

In both cases provide a DeadLetterPublishingRecoverer to the error handler; disable DLTs in the binder.

EDIT

Here's an example; it uses the newer functional style rather than the deprecated @StreamListener, but the same concepts apply (but you should consider moving to the functional style).

@SpringBootApplication
public class So69175145Application {

    public static void main(String[] args) {
        SpringApplication.run(So69175145Application.class, args);
    }

    @Bean
    ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(
            KafkaTemplate<byte[], byte[]> template) {

        return (container, dest, group) -> {
            container.setBatchErrorHandler(new RetryingBatchErrorHandler(new FixedBackOff(5000L, 2L),
                    new DeadLetterPublishingRecoverer(template,
                            (rec, ex) -> new TopicPartition("errors." + dest + "." + group, rec.partition()))));
        };

    }

    /*
     * DLT topic won't be auto-provisioned since enableDlq is false
     */
    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("errors.so69175145.grp").partitions(1).replicas(1).build();
    }

    /*
     * Functional equivalent of @StreamListener
     */
    @Bean
    public Consumer<List<String>> input() {
        return list -> {
            System.out.println(list);
            throw new RuntimeException("test");
        };
    }

    /*
     * Not needed here - just to show we sent them to the DLT
     */
    @KafkaListener(id = "so69175145", topics = "errors.so69175145.grp")
    public void listen(String in) {
        System.out.println("From DLT: " + in);
    }

}
spring.cloud.stream.bindings.input-in-0.destination=so69175145
spring.cloud.stream.bindings.input-in-0.group=grp
spring.cloud.stream.bindings.input-in-0.content-type=text/plain

spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true

# for DLT listener
spring.kafka.consumer.auto-offset-reset=earliest
[foo]
2021-09-14 09:55:32.838ERROR...
...
[foo]
2021-09-14 09:55:37.873ERROR...
...
[foo]
2021-09-14 09:55:42.886ERROR...
...
From DLT: foo
Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Thanks for the quick reply, For 'RetryingBatchErrorHandler' there is no example given, How can I add it in my error handler which I mentioned in the question. Does it work with spring-cloud-stream-binder-kafka library. If you can show me with example that would be really helpful. – APK Sep 14 '21 at 13:28
  • For me not able to create this object new RetryingBatchErrorHandler(new FixedBackOff(5000L, 2L) when I hover on it, it's written that "Cannot resolve symbol 'RetryingBatchErrorHandler" create class RetryingBatchErrorHandler. I can't import this class, I am using 3.0.4.RELEASE version of spring.cloud.stream.binder.kafka, is it version problem? Does it require 'spring-kafka' dependency?? – APK Sep 14 '21 at 17:07
  • It needs spring-kafka 2.3.7 or later; I don't remember which version that old 3.0.x pulls in. I think it switched to a later boot in 3.0.9 - try the latest 3.0.13 or upgrade to a newer version; latest is 3.1.3. – Gary Russell Sep 14 '21 at 18:17
  • I tried with "spring-cloud-dependencies :Hoxton.SR12" this pulls spring.cloud.stream.binder.kafka.3.0.13, and "spring-cloud-dependencies:2020.0.3" which pull spring.cloud.stream.binder.kafka.3.1.3. But still not able to import this class, is there anything to do with spring-boot-version, current-version is 2.2.4.RELEASE. if it's related to spring-boot version, then it would be a huge change (will there be any other solution to my problem). – APK Sep 14 '21 at 18:39
  • Boot 2.2 is at end of life https://github.com/spring-projects/spring-boot/wiki/Supported-Versions; however, 2.2.4 pulls in spring-kafka 2.3.5 (too early). The final 2.2.x release was 2.2.13; it pulls in spring-kafka 2.3.13, which has this error handler. You should always try to keep up to date with boot releases, even within a minor version (2.2.x). – Gary Russell Sep 14 '21 at 19:29
  • We will try to upgrade the spring boot version in near future Gary, but for now it's not in the pipeline, Can you suggest any other way, I can achieve this, without Recoverer will work for me, I need to retry for 3 times for whole batch but 'SeekToCurrentBatchHandler' keeps for retrying endlessly, Please can you suggest something, I have been struggling from days . Thanks – APK Sep 15 '21 at 03:21
  • 2.2.4 is nearly 2 years old; you should not have any difficulty upgrading within the 2.2.x line. You seemed to have no problem with upgrading the spring-cloud version, so why not Boot? If you MUST stick with Boot 2.2.4, you have two options - override just the spring-kafka version to 2.3.13 or handle the retries yourself in your listener code. – Gary Russell Sep 15 '21 at 12:58
  • I have override the spring-kafka version to 2.3.13, it was able to import the class, but after moving to functions I was getting ClassCastException, so moved to functional style, but I am not even consuming the messages not, I have asked that here, can you please check one, https://stackoverflow.com/questions/69234516/spring-cloud-stream-kafka-binder-not-able-to-consume-messages-in-functional-styl – APK Sep 18 '21 at 15:38
  • @GaryRussell , when I try using StreamListener and one of those batch handlers I am getting infinite retries. I am on spring-kafka 2.5.13 release. Any ideas on why that's the case? ListenerContainerCustomizer> customizer() { return (container, dest, group) -> { var batchHandler = new RecoveringBatchErrorHandler((a,b) -> { return; }, new FixedBackOff(1500, 3)); container.setBatchErrorHandler(batchHandler); }; } It's not respecting the retries or the backoff – blake_griffin Oct 18 '21 at 06:20
  • Code in comments is too hard to read always ask a new question if you have to show code; batch mode is not supported with the (deprecated) `@StreamListener`; only the functional programming model works with batch listeners. – Gary Russell Oct 18 '21 at 13:31