I read a ton of Gary Russell answers and posts, but didn't find actual solution for the common use-case for synchronization of the sequence below:
recieve from topic A => save to DB via Spring-data => send to topic B
As i understand properly: there is no guarantee for fully atomic processing in that case and i need to deal with messages deduplication on the client side, but the main issue is that ChainedKafkaTransactionManager doesn't synchronize with JpaTransactionManager (see @KafkaListener
below)
Kafka config:
@Production
@EnableKafka
@Configuration
@EnableTransactionManagement
public class KafkaConfig {
private static final Logger log = LoggerFactory.getLogger(KafkaConfig.class);
@Bean
public ConsumerFactory<String, byte[]> commonConsumerFactory(@Value("${kafka.broker}") String bootstrapServer) {
Map<String, Object> props = new HashMap<>();
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(AUTO_OFFSET_RESET_CONFIG, 'earliest');
props.put(SESSION_TIMEOUT_MS_CONFIG, 10000);
props.put(ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(MAX_POLL_RECORDS_CONFIG, 10);
props.put(MAX_POLL_INTERVAL_MS_CONFIG, 17000);
props.put(FETCH_MIN_BYTES_CONFIG, 1048576);
props.put(FETCH_MAX_WAIT_MS_CONFIG, 1000);
props.put(ISOLATION_LEVEL_CONFIG, 'read_committed');
props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerContainerFactory(
@Qualifier("commonConsumerFactory") ConsumerFactory<String, byte[]> consumerFactory,
@Qualifier("chainedKafkaTM") ChainedKafkaTransactionManager chainedKafkaTM,
@Qualifier("kafkaTemplate") KafkaTemplate<String, byte[]> kafkaTemplate,
@Value("${kafka.concurrency:#{T(java.lang.Runtime).getRuntime().availableProcessors()}}") Integer concurrency
) {
ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setMissingTopicsFatal(false);
factory.getContainerProperties().setTransactionManager(chainedKafkaTM);
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
var arbp = new DefaultAfterRollbackProcessor<String, byte[]>(new FixedBackOff(1000L, 3));
arbp.setCommitRecovered(true);
arbp.setKafkaTemplate(kafkaTemplate);
factory.setAfterRollbackProcessor(arbp);
factory.setConcurrency(concurrency);
factory.afterPropertiesSet();
return factory;
}
@Bean
public ProducerFactory<String, byte[]> producerFactory(@Value("${kafka.broker}") String bootstrapServer) {
Map<String, Object> configProps = new HashMap<>();
configProps.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
configProps.put(BATCH_SIZE_CONFIG, 16384);
configProps.put(ENABLE_IDEMPOTENCE_CONFIG, true);
configProps.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
var kafkaProducerFactory = new DefaultKafkaProducerFactory<String, byte[]>(configProps);
kafkaProducerFactory.setTransactionIdPrefix('kafka-tx-');
return kafkaProducerFactory;
}
@Bean
public KafkaTemplate<String, byte[]> kafkaTemplate(@Qualifier("producerFactory") ProducerFactory<String, byte[]> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
@Bean
public KafkaTransactionManager kafkaTransactionManager(@Qualifier("producerFactory") ProducerFactory<String, byte[]> producerFactory) {
KafkaTransactionManager ktm = new KafkaTransactionManager<>(producerFactory);
ktm.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return ktm;
}
@Bean
public ChainedKafkaTransactionManager chainedKafkaTM(JpaTransactionManager jpaTransactionManager,
KafkaTransactionManager kafkaTransactionManager) {
return new ChainedKafkaTransactionManager(kafkaTransactionManager, jpaTransactionManager);
}
@Bean(name = "transactionManager")
public JpaTransactionManager transactionManager(EntityManagerFactory em) {
return new JpaTransactionManager(em);
}
}
Kafka listener:
@KafkaListener(groupId = "${group.id}", idIsGroup = false, topics = "${topic.name.import}")
public void consume(List<byte[]> records, @Header(KafkaHeaders.OFFSET) Long offset) {
for (byte[] record : records) {
// cause infinity rollback (perhaps due to batch listener)
if (true)
throw new RuntimeExcetion("foo");
// spring-data storage with @Transactional("chainedKafkaTM"), since Spring-data can't determine TM among transactionManager, chainedKafkaTM, kafkaTransactionManager
var result = storageService.persist(record);
kafkaTemplate.send(result);
}
}
Spring-kafka version: 2.3.3 Spring-boot version: 2.2.1
What is a proper way to implement such use-case ? Spring-kafka documentation limited only to small/specific examples.
P.s. when i'm using @Transactional(transactionManager = "chainedKafkaTM", rollbackFor = Exception.class)
on @KafkaListener
method i am facing endless cyclic rollback, however FixedBackOff(1000L, 3L)
is set.
EDIT: i'm planning to achieve max affordable synchronization between listener, producer and database with configurable retries num.
EDIT: Code snippets above edited with respect to advised configuration. Using ARBP doesn't solve infinity rollback cycle for me, since the first statement's predicate is always false (SeekUtils.doSeeks
):
DefaultAfterRollbackProcessor
...
@Override
public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer, Exception exception,
boolean recoverable) {
if (SeekUtils.doSeeks(((List) records), consumer, exception, recoverable,
getSkipPredicate((List) records, exception), LOGGER)
&& isCommitRecovered() && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) {
ConsumerRecord<K, V> skipped = records.get(0);
this.kafkaTemplate.sendOffsetsToTransaction(
Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
new OffsetAndMetadata(skipped.offset() + 1)));
}
}
It is worth saying that there is no active transaction in Kafka Consumer method (TransactionSynchronizationManager.isActualTransactionActive()
).