4

I read a ton of Gary Russell answers and posts, but didn't find actual solution for the common use-case for synchronization of the sequence below:

recieve from topic A => save to DB via Spring-data => send to topic B

As i understand properly: there is no guarantee for fully atomic processing in that case and i need to deal with messages deduplication on the client side, but the main issue is that ChainedKafkaTransactionManager doesn't synchronize with JpaTransactionManager (see @KafkaListener below)

Kafka config:

@Production
@EnableKafka
@Configuration
@EnableTransactionManagement
public class KafkaConfig {

    private static final Logger log = LoggerFactory.getLogger(KafkaConfig.class);

    @Bean
    public ConsumerFactory<String, byte[]> commonConsumerFactory(@Value("${kafka.broker}") String bootstrapServer) {

        Map<String, Object> props = new HashMap<>();
        props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);

        props.put(AUTO_OFFSET_RESET_CONFIG, 'earliest');
        props.put(SESSION_TIMEOUT_MS_CONFIG, 10000);
        props.put(ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(MAX_POLL_RECORDS_CONFIG, 10);
        props.put(MAX_POLL_INTERVAL_MS_CONFIG, 17000);
        props.put(FETCH_MIN_BYTES_CONFIG, 1048576);
        props.put(FETCH_MAX_WAIT_MS_CONFIG, 1000);
        props.put(ISOLATION_LEVEL_CONFIG, 'read_committed');

        props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerContainerFactory(
            @Qualifier("commonConsumerFactory") ConsumerFactory<String, byte[]> consumerFactory,
            @Qualifier("chainedKafkaTM") ChainedKafkaTransactionManager chainedKafkaTM,
            @Qualifier("kafkaTemplate") KafkaTemplate<String, byte[]> kafkaTemplate,
            @Value("${kafka.concurrency:#{T(java.lang.Runtime).getRuntime().availableProcessors()}}") Integer concurrency
    ) {

        ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setMissingTopicsFatal(false);
        factory.getContainerProperties().setTransactionManager(chainedKafkaTM);

        factory.setConsumerFactory(consumerFactory);
        factory.setBatchListener(true);
        var arbp = new DefaultAfterRollbackProcessor<String, byte[]>(new FixedBackOff(1000L, 3));
        arbp.setCommitRecovered(true);
        arbp.setKafkaTemplate(kafkaTemplate);

        factory.setAfterRollbackProcessor(arbp);
        factory.setConcurrency(concurrency);

        factory.afterPropertiesSet();

        return factory;
    }

    @Bean
    public ProducerFactory<String, byte[]> producerFactory(@Value("${kafka.broker}") String bootstrapServer) {

        Map<String, Object> configProps = new HashMap<>();

        configProps.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);

        configProps.put(BATCH_SIZE_CONFIG, 16384);
        configProps.put(ENABLE_IDEMPOTENCE_CONFIG, true);

        configProps.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);

        var kafkaProducerFactory = new DefaultKafkaProducerFactory<String, byte[]>(configProps);
        kafkaProducerFactory.setTransactionIdPrefix('kafka-tx-');

        return kafkaProducerFactory;
    }

    @Bean
    public KafkaTemplate<String, byte[]> kafkaTemplate(@Qualifier("producerFactory") ProducerFactory<String, byte[]> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }

    @Bean
    public KafkaTransactionManager kafkaTransactionManager(@Qualifier("producerFactory") ProducerFactory<String, byte[]> producerFactory) {
        KafkaTransactionManager ktm = new KafkaTransactionManager<>(producerFactory);
        ktm.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        return ktm;
    }

    @Bean
    public ChainedKafkaTransactionManager chainedKafkaTM(JpaTransactionManager jpaTransactionManager,
                                                         KafkaTransactionManager kafkaTransactionManager) {
        return new ChainedKafkaTransactionManager(kafkaTransactionManager, jpaTransactionManager);
    }

    @Bean(name = "transactionManager")
    public JpaTransactionManager transactionManager(EntityManagerFactory em) {
        return new JpaTransactionManager(em);
    }
}

Kafka listener:

@KafkaListener(groupId = "${group.id}", idIsGroup = false, topics = "${topic.name.import}")
public void consume(List<byte[]> records, @Header(KafkaHeaders.OFFSET) Long offset) {
    for (byte[] record : records) {
        // cause infinity rollback (perhaps due to batch listener)
        if (true)
            throw new RuntimeExcetion("foo");

        // spring-data storage with @Transactional("chainedKafkaTM"), since Spring-data can't determine TM among transactionManager, chainedKafkaTM, kafkaTransactionManager
        var result = storageService.persist(record);

        kafkaTemplate.send(result);
    }
}

Spring-kafka version: 2.3.3 Spring-boot version: 2.2.1

What is a proper way to implement such use-case ? Spring-kafka documentation limited only to small/specific examples.

P.s. when i'm using @Transactional(transactionManager = "chainedKafkaTM", rollbackFor = Exception.class) on @KafkaListener method i am facing endless cyclic rollback, however FixedBackOff(1000L, 3L) is set.

EDIT: i'm planning to achieve max affordable synchronization between listener, producer and database with configurable retries num.

EDIT: Code snippets above edited with respect to advised configuration. Using ARBP doesn't solve infinity rollback cycle for me, since the first statement's predicate is always false (SeekUtils.doSeeks):

DefaultAfterRollbackProcessor
...
@Override
    public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer, Exception exception,
            boolean recoverable) {

        if (SeekUtils.doSeeks(((List) records), consumer, exception, recoverable,
                getSkipPredicate((List) records, exception), LOGGER)
                    && isCommitRecovered() && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) {
            ConsumerRecord<K, V> skipped = records.get(0);
            this.kafkaTemplate.sendOffsetsToTransaction(
                    Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
                            new OffsetAndMetadata(skipped.offset() + 1)));
        }
    }

It is worth saying that there is no active transaction in Kafka Consumer method (TransactionSynchronizationManager.isActualTransactionActive()).

uptoyou
  • 1,427
  • 19
  • 24

1 Answers1

4

What makes you think it's not synchronized? You really don't need @Transactional since the container will start both transactions.

You shouldn't use a SeekToCurrentErrorHandler with transactions because that occurs within the transaction. Configure the after rollback processor instead. The default ARBP uses a FixedBackOff(0L, 9) (10 attempts).

This works fine for me; and stops after 4 delivery attempts:

@SpringBootApplication
public class So58804826Application {

    public static void main(String[] args) {
        SpringApplication.run(So58804826Application.class, args);
    }

    @Bean
    public JpaTransactionManager transactionManager() {
        return new JpaTransactionManager();
    }


    @Bean
    public ChainedKafkaTransactionManager<?, ?> chainedTxM(JpaTransactionManager jpa,
            KafkaTransactionManager<?, ?> kafka) {

        kafka.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        return new ChainedKafkaTransactionManager<>(kafka, jpa);
    }

    @Autowired
    private Saver saver;

    @KafkaListener(id = "so58804826", topics = "so58804826")
    public void listen(String in) {
        System.out.println("Storing: " + in);
        this.saver.save(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so58804826")
                .partitions(1)
                .replicas(1)
                .build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
//          template.executeInTransaction(t -> t.send("so58804826", "foo"));
        };
    }

}

@Component
class ContainerFactoryConfigurer {

    ContainerFactoryConfigurer(ConcurrentKafkaListenerContainerFactory<?, ?> factory,
            ChainedKafkaTransactionManager<?, ?> tm) {

        factory.getContainerProperties().setTransactionManager(tm);
        factory.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(new FixedBackOff(1000L, 3)));
    }

}

@Component
class Saver {

    @Autowired
    private MyEntityRepo repo;

    private final AtomicInteger ids = new AtomicInteger();

    @Transactional("chainedTxM")
    public void save(String in) {
        this.repo.save(new MyEntity(in, this.ids.incrementAndGet()));
        throw new RuntimeException("foo");
    }

}

I see "Participating in existing transaction" from both TxMs.

and with @Transactional("transactionManager"), I just get it from the JPATm, as one would expect.

EDIT

There is no concept of "recovery" for a batch listener - the framework has no idea which record in the batch needs to be skipped. In 2.3, we added a new feature for batch listeners when using MANUAL ack modes.

See Committing Offsets.

Starting with version 2.3, the Acknowledgment interface has two additional methods nack(long sleep) and nack(int index, long sleep). The first one is used with a record listener, the second with a batch listener. Calling the wrong method for your listener type will throw an IllegalStateException.

When using a batch listener, you can specify the index within the batch where the failure occurred. When nack() is called, offsets will be committed for records before the index and seeks are performed on the partitions for the failed and discarded records so that they will be redelivered on the next poll(). This is an improvement over the SeekToCurrentBatchErrorHandler, which can only seek the entire batch for redelivery.

However, the failed record will still be replayed indefinitely.

You could keep track of the record that keeps failing and nack index + 1 to skip over it.

However, since your JPA tx has rolled back; this won't work for you.

With batch listener's you must handle problems with batches in your listener code.

Community
  • 1
  • 1
Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Wow, thanks a ton!!! I'll check it this night! But in my example the record got persisted before RuntimeException got thrown. What do you mean under "You really don't need @Transactional" - i don't need @Transactional on repository or consumer? Oh, sorry, now i see: i don't need Transactional on Consumer/Listener, but i need to configure Spring-data with ChainedTM, not just plain JpaTransactionManager, am i right? – uptoyou Nov 11 '19 at 19:32
  • No; I am saying the `@Transactional("chainedTxM")` is redundant because the listener container starts the transactions before calling the listener. It doesn't hurt (because with either one we get `Participating in existing transaction`) but it's unnecessary. – Gary Russell Nov 11 '19 at 20:40
  • I checked the setup and i got two issues: 1. Spring-data can't make descision between 3 TM (kafka, chain , jpa) so it doesn't start, so i need to set @Transactional("chainedTM”) on my repository class, 2. If runtime exception thrown at the beginning of listen method (before any template or repository usage) it cycle infinitely and ARBP doesn't work. – uptoyou Nov 11 '19 at 21:27
  • In case of infinity cycle, it always seeks to the same offset regardless of ARBP: INFO o.a.k.clients.consumer.KafkaConsumer - .... Seeking to offset 1... Please, look at second EDIT in answer. – uptoyou Nov 11 '19 at 21:41
  • It makes no sense; my example works fine with or without the `@Transactional`. Post a DEBUG log someplace that shows the behavior you are seeing. Compare your code to mine. – Gary Russell Nov 12 '19 at 18:24
  • Do i need to ask new question specifically for cycle rollback ? Since the configuration is exactly the same as here (code snippets were edited with respect to your advises) and ARBP BackOff doesn't work for Batch listener. – uptoyou Nov 12 '19 at 23:20
  • Sorry, I didn't notice (or forgot) you are using a batch listener. The framework can't really help there since we don't know where in the batch the failure occurred. See the edit to my answer. – Gary Russell Nov 13 '19 at 00:03
  • Thanks a ton! Last question - does 'neck' mechanics intended for use within Transactional ConcurrentKafkaContainer? – uptoyou Nov 13 '19 at 00:21
  • Yes, the transaction will commit after sending the pre-nack-index offset(s) to the transaction. From the container's point of view, it's as if the listener successfully processed a partial batch (it must return normally after nacking) and the next batch will start at the record at the nack index. – Gary Russell Nov 13 '19 at 00:30
  • Is it proper solution to implement custom BatchErrorHandler instead, to overcome this case ”However, since your JPA tx has rolled back; this won't work for you.”? However i suspect, that there might be problem with 'seekToLastOffset’ in the framework's KafkaListener logic, if BatchErrorHandler executes without exception. – uptoyou Nov 13 '19 at 08:00
  • A batch error handler can't help you; again, the framework can't make any assumptions about the batch, it doesn't know which record failed so it always replays the whole batch. The only other option would be to discard the whole batch; the framework doesn't support that option out of the box; you would need a custom `AfterRollbackProcessor` to do that. – Gary Russell Nov 13 '19 at 14:12
  • Apologies for being a party crasher. I am also trying to achieve the same thing. As suggested by @GaryRussell I am using non batch mode and exact same process/config but I am seeing that data is saved to kafka and not saved to DB when an exception is thrown and offset is also not committed which goes in loop. – Sagar Kharab Mar 11 '21 at 11:34
  • Ask a new question showing your code and configuration. We can’t guess what is wrong with it. Consumers must have isolation.level=read_committed to not see rolled back published records. – Gary Russell Mar 11 '21 at 13:16