0

Let's say I have a @KafkaListener class with a @KafkaHandler method inside that processes any received messages and does some DB operations.

I want to have fine-grained control over how and when to commit (or rollback) the database changes (i.e. manually manage the DB transaction) in this class. The consumed message offset can be committed regardless of the DB transaction result.

Here is a simplified version of what I have:

@Service
@RequiredArgsConstructor
@KafkaListener(
    topics = "${kafka.topic.foo}",
    groupId = "${spring.kafka.consumer.group-id-foo}",
    containerFactory = "kafkaListenerContainerFactoryFoo")
public class FooMessageConsumer {

  // ...
  private final EntityManager entityManager;

  @KafkaHandler
  public void handleMessage(FooMessage msg) {
    // ...
    handleDBOperations(msg);
    // ...
  }

  void handleDBOperations(msg) {
    try {
      entityManager.getTransaction().begin();

      // ...

      entityManager.getTransaction().commit();
    } catch (Exception e) {
      log.error(e.getLocalizedMessage(), e);
      entityManager.getTransaction().rollback();
    }
  }

}

When a message is received and entityManager.getTransaction().begin(); is invoked, this results in an exception:

java.lang.IllegalStateException: Not allowed to create transaction on shared EntityManager - use Spring transactions or EJB CMT instead

Why am I not allowed to create a transaction here?

And if I remove EntityManager and add @Transactional annotations to methods with DB operations (though this is not exactly what I want), then it results in another exception:

TransactionRequiredException Executing an update/delete query

It seems like it completely ignores the annotation. Is this somehow related to Kafka consumer having its own transaction management?

In short, what am I doing wrong here and how can I manage DB transactions in a @KafkaHandler method?

Any help is appreciated. Thanks in advance.

emrekgn
  • 624
  • 9
  • 25

1 Answers1

1

Try using Springs TransactionTemplate: https://docs.spring.io/spring-framework/docs/3.0.0.M4/reference/html/ch10s06.html

If your use case is (that) simple, Springs declarative transaction management should also let you achieve the behavior you ask for: https://docs.spring.io/spring-framework/docs/3.0.0.M3/reference/html/ch11s05.html

fladdimir
  • 1,230
  • 1
  • 5
  • 12
  • Yeap, using `TransactionTemplate` solved it. But, let me ask, why couldn't I just use `EntityManager` transaction or even a `@Transactional` annotation? – emrekgn Dec 24 '21 at 07:31
  • there is an existing question on why asking the entity manager this way does not work: https://stackoverflow.com/a/42901969 ; @Transactional is realized internally by spring via an aop proxy, so it can only work when calling annotated public methods of an injected spring bean (did you try it that way?): https://docs.spring.io/spring-framework/docs/4.2.x/spring-framework-reference/html/transaction.html#tx-decl-explained – fladdimir Dec 24 '21 at 13:06
  • I was not aware of the scope of the entity manager, that was very helpful. However,I still couldn't understand why `@Transactional` is not working when it's already a public method of a Spring-managed object. I also learned about Kafka TM which might be the reason why DB TM didn't work (hence the `TransactionRequiredException`) in my case, even though the annotation was there. I'll try to explicitly specify the TM and update the post to include possible solutions but thanks! [1](https://docs.spring.io/spring-kafka/reference/html/#ex-jdbc-sync) [2](https://stackoverflow.com/a/47355822/5492826) – emrekgn Dec 27 '21 at 15:01