13

I want to synchronize a kafka transaction with a repository transaction:

@Transactional
public void syncTransaction(){
  myRepository.save(someObject)
  kafkaTemplate.send(someEvent)
}

Since the merge (https://github.com/spring-projects/spring-kafka/issues/373) and according to the doc this is possible. Nevertheless i have problems to understand and implement that feature. Looking at the example in https://docs.spring.io/spring-kafka/reference/html/#transaction-synchronization I have to create a MessageListenerContainer to listen to my own events. Do I still have to send my events using the KafkaTemplate? Does the MessageListenerContainer prohibit the sending to the broker?

And if i understand correctly the kafkaTemplate und the kafkaTransactionManager have to use the same producerFactory in which i have to enable Transaction setting a transactionIdPrefix. And in my example i have to set the TransactionManager of the messageListenerContainer to the DataSourceTransactionManager. Is that correct?

From my perspective it looks weird that I send an event via kafkaTemplate, listen to my own event and forward the event using the kafkaTemplate again.

I would really help me if i can get an example for a simple synchronization of a kafka transaction with a repository transaction and an explanation.

beat
  • 1,857
  • 1
  • 22
  • 36
Eike Behrends
  • 855
  • 2
  • 7
  • 11

4 Answers4

10

If the listener container is provisioned with a KafkaTransactionManager, the container will create a producer which will be used by any downstream kafka template and the container will send the offsets to the transaction for you.

If the container has some other transaction manager, the container can't send the offsets since it doesn't have access to the producer (or template).

Another solution is to annotate your method with @Transactional (with the datasource TM) and configure the container with a kafka TM.

That way, your DB tx will commit just before the thread returns to the container which will then send the offsets to the kafka transaction and commit it.

See the framework test cases for examples.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • 2
    "That way, your DB tx will commit just before the thread returns to the container which will then send the offsets to the kafka transaction and commit it." => But are the DB commit and the Kafka commit really one atomic operation? It sounds like two transactions performed one after another but not like one transaction. – Eike Behrends Nov 20 '17 at 09:26
  • 2
    Yes, but that is also true with transaction synchronization - it's called "Best Efforts 1PC Pattern" in [Dr. Dave Syer's excellent Javaworld Artucle "Distributed transactions in Spring, with and without XA"](https://www.javaworld.com/article/2077963/open-source-tools/distributed-transactions-in-spring--with-and-without-xa.html). Kafka doesn't support XA and you have to deal with the possibility that the DB tx might commit while the Kafka tx rolls back. – Gary Russell Nov 20 '17 at 14:29
  • Is it possible to commit both DB and Kafka in atomic way by using chainedTransaction? I have a task which its status saved in database and at the same time, I need to put the DB record id to kafka topic and consume it with spring-kafka consumer. Is this good practice to save a record in DB and put its id to Kafka then in the consumer, fetch some records based on that DB record id which put in Kafka? – aeranginkaman Oct 23 '20 at 05:58
  • It's not atomic; it's "Best Efforts 1 phase commit" - see the article I referenced in the comment above. Also see https://docs.spring.io/spring-kafka/docs/2.6.2/reference/html/#transactions - it's best to not ask new questions in comments, especially when the answer is 3 years old. Things change over time. – Gary Russell Oct 23 '20 at 13:40
7

@Eike Behrends to have a db + kafka transaction, you can use ChainedTransactionManager and define it this way :

@Bean
public KafkaTransactionManager kafkaTransactionManager() {
    KafkaTransactionManager ktm = new KafkaTransactionManager(producerFactory());;
    ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
    return ktm;
}


@Bean
@Primary
public JpaTransactionManager transactionManager(EntityManagerFactory em) {
    return new JpaTransactionManager(em);
}

@Bean(name = "chainedTransactionManager")
public ChainedTransactionManager chainedTransactionManager(JpaTransactionManager jpaTransactionManager,
                                                           KafkaTransactionManager kafkaTransactionManager) {
    return new ChainedTransactionManager(kafkaTransactionManager, jpaTransactionManager);
}

You need to annotate your transactional db+kafka methods @Transactional("chainedTransactionManager")

(you can see the issue on spring-kafka project : https://github.com/spring-projects/spring-kafka/issues/433 )

You say :

From my perspective it looks weird that I send an event via kafkaTemplate, listen to my own event and forward the event using the kafkaTemplate again.

Have you tried this ? If so can you provide an example please ?

nader.h
  • 506
  • 2
  • 17
0

For achieving your target you should use a different "eventually consistent" approach like CDC (Change Data Capture). There are no atomic transactions between Kafka writes and any other system (e.g. a database) - aka XA transactions. It is a complete paradigm swift when you have distributed services (some call them microservices) that in your case probably communicate by producing/ consuming to/ from Kafka topics.

Vassilis
  • 914
  • 8
  • 23
-2

TL;DR: just use upsert / merge.

Accidentally seen this old topic and after so many years people still struggle.

Just want to share simplest and most native approach to deal with such systems as kafka.

The real issue why people come here for an answer is old approach of distributed transactions. And most ones want to synchronize non-transactional (kafka named it's functionality as transactions but they are "special" actually) kafka with some ACID database.

If your service is working within idempotent environment - everything downstream should be idempotent too.

Just make sure your operations to underlying storage are idempontent, the simplest approach are upsert / merge (depends on the storage).

P.s. CDC is a thing, but it requires much more labor cost and is unnecessary in most typical cases.

MORE : If you want to dig about why kafka "transactions" are special, here are good starting points (explained within eos):

EDIT
Very interesting why this answer got downvotes... Just check this issue/comments/related issues https://github.com/spring-projects/spring-data-commons/issues/2232 - thats why one would not want to use ChainedTransactionManager for business-critical Transactions (it can't act as a real 2PC by design).

uptoyou
  • 1,427
  • 19
  • 24