0

I have the following Kafka Configuration class:

@Configuration
@AllArgsConstructor(access = AccessLevel.PROTECTED)

public class KafkaConfiguration {
private final KafkaConfigurationProperties kafkaConfigurationProperties;

@Bean
public ConcurrentKafkaListenerContainerFactory<String, RepaymentEvent> debtCollectorConsumerContainerFactory() {
     ConcurrentKafkaListenerContainerFactory<String, RepaymentEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfiguration()));
    factory.setConcurrency(kafkaConfigurationProperties.getDebtCollectorConsumerThreads());
    factory.setStatefulRetry(true);
    factory.setErrorHandler(new SeekToCurrentErrorHandler((record, exception) -> {
        if (exception instanceof SomeCustomException) {
            // here I want to mannually Acknowledge the consuming of the record
        }
    }, 10));

    ContainerProperties containerProperties = factory.getContainerProperties();
    containerProperties.setAckOnError(false);
    containerProperties.setAckMode(ContainerProperties.AckMode.RECORD);
    return factory;
}

@Bean
@Qualifier(KAFKA_LOAN_REPAYMENT_PRODUCER)
public Producer<String, RepaymentEvent> loanRepaymentProducer() {
    return new KafkaProducer<>(producerConfiguration());
}

@Bean
@Qualifier(KAFKA_DEBT_COLLECTOR_PRODUCER)
public Producer<String, RepaymentEvent> debtCollectorProducer() {
    return new KafkaProducer<>(producerConfiguration());
}

private Map<String, Object> consumerConfiguration() {
    Map<String, Object> properties = new HashMap<>();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigurationProperties.getBootstrapServers());
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfigurationProperties.getDebtCollectorConsumerGroupId());
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaConfigurationProperties.getDebtCollectorConsumerAutoOffsetReset());
    properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConfigurationProperties.getDebtCollectorConsumerMaxPollRecords());
    properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, Boolean.TRUE);
    properties.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaConfigurationProperties.getConfluentEndpoint());
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.FALSE);
    return properties;
}

private Map<String, Object> producerConfiguration() {
    Map<String, Object> properties = new HashMap<>();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigurationProperties.getBootstrapServers());
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
    properties.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaConfigurationProperties.getConfluentEndpoint());
    return properties;
}
}

and the following KafkaListener:

@Slf4j
@Component
@AllArgsConstructor(access = AccessLevel.PROTECTED)
public class DebtCollectorIncomingClient {

private final RepaymentTransferProcessService repaymentTransferProcessService;

@KafkaListener(
        topics = "${kafka.debtCollectorIncomingTopic}",
        groupId = "${kafka.debtCollectorConsumerAutoOffsetReset}",
        containerFactory = "debtCollectorConsumerContainerFactory")
public void submitMoneyTransferCommand(@Payload RepaymentEvent repaymentEvent) {
    log.info("Receiving command: {}", repaymentEvent);
    if (repaymentEvent.getPayload() instanceof RepaymentRequestTransfer) {
        RepaymentTransfer repaymentTransfer = aRepaymentTransfer(repaymentEvent);
        repaymentTransferProcessService.startRepaymentTransferProcess(repaymentTransfer);
    }
}

private RepaymentTransfer aRepaymentTransfer(RepaymentEvent repaymentEvent) {
    RepaymentRequestTransfer repaymentRequestTransfer = (RepaymentRequestTransfer) repaymentEvent.getPayload();
    return RepaymentTransfer.builder()
            .clientId(repaymentRequestTransfer.getClientId())
            .contractId(repaymentRequestTransfer.getContractId())
            .amount(BigDecimal.valueOf(repaymentRequestTransfer.getAmount()))
            .currency(Currency.getInstance(repaymentRequestTransfer.getCurrency().name()))
            .debtCollectorExternalId(repaymentEvent.getCorrelationId())
            .debtType(repaymentRequestTransfer.getDebtType())
            .build();
}
}

I want to use SeekToCurrentErrorHandler for error handling and I want something specific like here, but currently I'm using springBootVersion=2.0.4.RELEASE, springKafkaVersion=2.1.4.RELEASE, kafkaVersion=2.0.1 and confluentVersion=3.3.1. Can you help me to set up the dependencies and the configuration in order to handle errors in Kafka consumer?

Regards!

Dina Bogdan
  • 4,345
  • 5
  • 27
  • 56
  • You already have it configured. Exactly what is your question? – Gary Russell May 14 '19 at 13:42
  • I want to use the SeekToCurrentErrorHandler() that let me to handle the exception and I've found that this implementation of SeekToCurrentErrorHandler is available in Spring for Apache Kafka 2.2. More details can be found in the link posted. I didn't found a dependency matrix for Spring Boot and Spring Kafka in order to upgrade Spring Kafka from 2.1.4 to 2.2 version. – Dina Bogdan May 14 '19 at 15:26

2 Answers2

1

The SeekToCurrentErrorHandler has been available since version 2.0.1. The additional functionality (recovery after some number of retries) was added in version 2.2.

Use Spring Boot 2.1.4, and Spring for Apache Kafka 2.2.6 (Boot 2.1.5 will be available soon).

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Hi Garry! I've added some more details in my question. Please provide me an answear. Big TX! – Dina Bogdan May 17 '19 at 07:45
  • The recoverer doesn't have access to the consumer so it can't do commits. Starting with version 2.2.4, the `SeekToCurrentErrorHandler` has a new property `commitRecovered` which will commit the offset of the recovered record as long as the container is configured with `AckMode.MANUAL_IMMEDIATE`. It doesn't make sense to only commit for certain exceptions in the recoverer because, after recovery, the record is skipped anyway and the next record will go to the listener. – Gary Russell May 17 '19 at 15:27
  • Hi Gary! I have extended the SeekToCurrentErrorHandler class and in the handle method I want to pause and resume the consumer after a specific delay. Is any way for achieving such behavior? Tx! – Dina Bogdan May 22 '19 at 07:44
1

After some days and reading Gary answers at some other posts, I finally found the solution for my problem. Maybe the question is not very descriptive, but this answer is describing the behavior that I want.

in a @Configuration I'm creating the following Spring bean:

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, RepaymentEvent> debtCollectorConsumerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, RepaymentEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfiguration()));
        factory.setConcurrency(kafkaConfigurationProperties.getDebtCollectorConsumerThreads());
        factory.setErrorHandler(new BlockingSeekToCurrentErrorHandler());

        ContainerProperties containerProperties = factory.getContainerProperties();
        containerProperties.setAckOnError(false);
        containerProperties.setAckMode(ContainerProperties.AckMode.RECORD);

        factory.setRetryTemplate(retryTemplate());
        return factory;
    }

private RetryTemplate retryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(backOffPolicy());
    retryTemplate.setRetryPolicy(new SimpleRetryPolicy(kafkaConfigurationProperties.getDebtCollectorConsumerRetryAttempts()));
    return retryTemplate;
}

And BlockingSeekToCurrentErrorHandler class:

public class BlockingSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {

    private static final int MAX_RETRY_ATTEMPTS = Integer.MAX_VALUE;

    BlockingSeekToCurrentErrorHandler() {
        super(MAX_RETRY_ATTEMPTS);
    }

    @Override
    public void handle(Exception exception, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
        try {
            if (!records.isEmpty()) {
                log.warn("Exception: {} occurred with message: {}", exception, exception.getMessage());
                MetricFactory.handleDebtCollectorIncomingBlockingError(records.get(0), exception);
                super.handle(exception, records, consumer, container);
            }
        } catch (SerializationException e) {
            log.warn("Exception: {} occurred with message: {}", e, e.getMessage());
            MetricFactory.handleDebtCollectorIncomingDeserializationError(records, e);
        }
    }
}
Dina Bogdan
  • 4,345
  • 5
  • 27
  • 56