2

Initially triggered via api call

1. Service A produces m1 to topic1 (non transactional send)

2. Service B consumes topic1 and does some processing
(begin tx)

3. Service B produces m2 to topic2
(commit tx)

4. Service A consumes topic2
(begin tx)

Here is my producer config:

final Map<String, Object> props = Maps.newConcurrentMap();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);


props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.ACKS_CONFIG, "all"); 
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "producer-tx-1");

Here is my consumer config:

final Map<String, Object> props = Maps.newHashMap();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

I read this sample scenarios

And I try to follow but i'm getting some issues:

Here is my producer code:

    @Override
public Mono<SenderResult<Void>> buy(Message msg) {
    final ReactiveKafkaProducerTemplate kafkaProducerTemplate = kafkaConfig.getKafkaProducerTemplate();
    return kafkaProducerTemplate.send(mytopic, msg);

}

My Consumer code:

@Override
public void run(ApplicationArguments arg0) throws Exception {
    final ReactiveKafkaProducerTemplate kafkaProducerTemplate = kafkaConfig.getKafkaProducerTemplate();
    final ReactiveKafkaConsumerTemplate kafkaConsumerTemplate = kafkaConfig.getKafkaConsumerTemplate(mytopic, Message.class);

    final Flux<ConsumerRecord<String, Message>> flux = kafkaConsumerTemplate.receiveExactlyOnce(kafkaProducerTemplate.transactionManager())
            .concatMap(receiverRecordFlux -> receiverRecordFlux );

    flux.subscribe(record -> {
        final Message message = record.value();

    System.out.printf("received message: timestamp=%s key=%d value=%s\n",
                dateFormat.format(new Date(record.timestamp())),
                record.key(),
                message);

     transactionService.processAndSendToNextTopic(message)
                .doOnSuccess(aVoid -> kafkaProducerTemplate.transactionManager().commit())
                .subscribe();

    });
}

I'm always getting following error when testing to produce and consume message to a single partitioned topic under happy case.

Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION

Can someone please show me how to correctly begin a transaction, and when to commit and close a transaction?

Note that this all works fine if i don't use transactional - receive() instead of receiveExactlyOnce

user1955934
  • 3,185
  • 5
  • 42
  • 68
  • 1
    it's pretty much like your transaction is not committed yet and you're starting a new one, could you share more details about your code ? especially regarding transactionService.processAndSendToNextTopic(message) ? Btw, are the configurations you're showing are for both service A and service B ? because Service B consumer does not need to read with isolated level committed ( since it does read non trx messages) – Yannick Aug 06 '19 at 05:49
  • yes so after serviceB consumes from topic1, it does processAndSend which basically does some database operations and then executing the producer of service B to send msg to topic2 – user1955934 Aug 06 '19 at 09:26
  • Yannick do you have a simple example for reactive kafka template using transactional? – user1955934 Jan 15 '20 at 16:35

1 Answers1

0

You may have to configure KafkaTransactionManager synchronization to SYNCHRONIZATION_ALWAYS.

Please check.

See also

Transaction Synchronization in Spring Kafka

https://docs.spring.io/spring-kafka/api/org/springframework/kafka/transaction/KafkaTransactionManager.html