1

In a Spring Batch I developed an ItemWriter that use a StreamBridge to write to Kafka with a transactional producer. With a CompositeItemWriter, the step use also a MongoItemWriter, so the output is written both in mongodb and kafka. The step is configured to use the MongoTransactionManager. The transaction order is: first MongoDB than Kafka.

I noticed that when a ProducerFencedException is raised during the write of the last item of a chunk (probably in a different thread), the write step is unable to cath this exception and commit the mongodb transaction but the Kafka transaction that follows fails. So the mongodb changes are persisted but the kafka message are rolledback and the batch continue with the next chunk.

How can I solve this problem?

Is there a way to check the last kafka write or the kafka transaction status before the batch commits to MongoDB so that I can raise an exception if something went wrong?

Another solution would be to invert the transaction commit order, assuming that the commit of the kafka transaction is more uncertain than that of the mongo transaction. Is it possible?

Here you can find the log of the batch, where you can see the Mongo commit, the kafka rollback and the batch that does not notice what has happened.

2022-05-07 11:20:00.300 DEBUG 12964 --- [           main] o.s.batch.core.step.tasklet.TaskletStep  : Applying contribution: [StepContribution: read=0, written=10, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING]
2022-05-07 11:20:00.435 DEBUG 12964 --- [           main] o.s.batch.core.step.tasklet.TaskletStep  : Saving step execution before commit: StepExecution: id=1136, version=2, name=ks_batch-file-to-kafka, status=STARTED, exitStatus=EXECUTING, readCount=20, filterCount=0, writeCount=20 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=2, rollbackCount=9, exitDescription=
2022-05-07 11:20:00.543 DEBUG 12964 --- [           main] o.s.d.mongodb.MongoTransactionManager    : Initiating transaction commit
2022-05-07 11:20:00.544 DEBUG 12964 --- [           main] o.s.d.mongodb.MongoTransactionManager    : About to commit transaction for session [ClientSessionImpl@495e1ad1 id = {"id": {"$binary": {"base64": "sZhOOchoRj2ydBGjM+KoLQ==", "subType": "04"}}}, causallyConsistent = true, txActive = true, txNumber = 11, error = d != java.lang.Boolean].
2022-05-07 11:20:00.586 DEBUG 12964 --- [           main] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-tx-ks_batch-file-to-kafka-ff42f0cb-e437-4c8b-b4b1-1db2c788b1a9-9, transactionalId=tx-ks_batch-file-to-kafka-ff42f0cb-e437-4c8b-b4b1-1db2c788b1a9-9] Transition from state IN_TRANSACTION to COMMITTING_TRANSACTION
2022-05-07 11:20:00.586 DEBUG 12964 --- [           main] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-tx-ks_batch-file-to-kafka-ff42f0cb-e437-4c8b-b4b1-1db2c788b1a9-9, transactionalId=tx-ks_batch-file-to-kafka-ff42f0cb-e437-4c8b-b4b1-1db2c788b1a9-9] Enqueuing transactional request EndTxnRequestData(transactionalId='tx-ks_batch-file-to-kafka-ff42f0cb-e437-4c8b-b4b1-1db2c788b1a9-9', producerId=3727, producerEpoch=0, committed=true)
2022-05-07 11:20:00.586 TRACE 12964 --- [-1db2c788b1a9-9] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-tx-ks_batch-file-to-kafka-ff42f0cb-e437-4c8b-b4b1-1db2c788b1a9-9, transactionalId=tx-ks_batch-file-to-kafka-ff42f0cb-e437-4c8b-b4b1-1db2c788b1a9-9] Request EndTxnRequestData(transactionalId='tx-ks_batch-file-to-kafka-ff42f0cb-e437-4c8b-b4b1-1db2c788b1a9-9', producerId=3727, producerEpoch=0, committed=true) dequeued for sending
2022-05-07 11:20:00.659 TRACE 12964 --- [-1db2c788b1a9-9] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-tx-ks_batch-file-to-kafka-ff42f0cb-e437-4c8b-b4b1-1db2c788b1a9-9, transactionalId=tx-ks_batch-file-to-kafka-ff42f0cb-e437-4c8b-b4b1-1db2c788b1a9-9] Received transactional response EndTxnResponseData(throttleTimeMs=0, errorCode=90) for request EndTxnRequestData(transactionalId='tx-ks_batch-file-to-kafka-ff42f0cb-e437-4c8b-b4b1-1db2c788b1a9-9', producerId=3727, producerEpoch=0, committed=true)
2022-05-07 11:20:00.659  INFO 12964 --- [-1db2c788b1a9-9] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-tx-ks_batch-file-to-kafka-ff42f0cb-e437-4c8b-b4b1-1db2c788b1a9-9, transactionalId=tx-ks_batch-file-to-kafka-ff42f0cb-e437-4c8b-b4b1-1db2c788b1a9-9] Transiting to fatal error state due to org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one.
2022-05-07 11:20:00.659 DEBUG 12964 --- [-1db2c788b1a9-9] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-tx-ks_batch-file-to-kafka-ff42f0cb-e437-4c8b-b4b1-1db2c788b1a9-9, transactionalId=tx-ks_batch-file-to-kafka-ff42f0cb-e437-4c8b-b4b1-1db2c788b1a9-9] Transition from state COMMITTING_TRANSACTION to error state FATAL_ERROR

org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one.

2022-05-07 11:20:00.659 ERROR 12964 --- [           main] o.s.k.core.DefaultKafkaProducerFactory   : commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@410e043b]

org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one.

2022-05-07 11:20:00.659  WARN 12964 --- [           main] o.s.k.core.DefaultKafkaProducerFactory   : Error during some operation; producer removed from cache: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@410e043b]
2022-05-07 11:20:00.659  INFO 12964 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-tx-ks_batch-file-to-kafka-ff42f0cb-e437-4c8b-b4b1-1db2c788b1a9-9, transactionalId=tx-ks_batch-file-to-kafka-ff42f0cb-e437-4c8b-b4b1-1db2c788b1a9-9] Closing the Kafka producer with timeoutMillis = 5000 ms.
2022-05-07 11:20:00.660  INFO 12964 --- [           main] org.apache.kafka.common.metrics.Metrics  : Metrics scheduler closed
2022-05-07 11:20:00.660  INFO 12964 --- [           main] org.apache.kafka.common.metrics.Metrics  : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2022-05-07 11:20:00.660  INFO 12964 --- [           main] org.apache.kafka.common.metrics.Metrics  : Metrics reporters closed
2022-05-07 11:20:00.661  INFO 12964 --- [           main] o.a.kafka.common.utils.AppInfoParser     : App info kafka.producer for producer-tx-ks_batch-file-to-kafka-ff42f0cb-e437-4c8b-b4b1-1db2c788b1a9-9 unregistered
2022-05-07 11:20:00.661 DEBUG 12964 --- [           main] o.s.d.mongodb.MongoTransactionManager    : About to release Session [ClientSessionImpl@495e1ad1 id = {"id": {"$binary": {"base64": "sZhOOchoRj2ydBGjM+KoLQ==", "subType": "04"}}}, causallyConsistent = true, txActive = false, txNumber = 11, error = d != java.lang.Boolean] after transaction.
2022-05-07 11:20:00.661 DEBUG 12964 --- [           main] o.s.b.c.s.i.SimpleRetryExceptionHandler  : Handled non-fatal exception

org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one.

2022-05-07 11:20:00.661 DEBUG 12964 --- [           main] o.s.b.c.s.c.StepContextRepeatCallback    : Preparing chunk execution for StepContext: org.springframework.batch.core.scope.context.StepContext@66682e8f
2022-05-07 11:20:00.661 DEBUG 12964 --- [           main] o.s.b.c.s.c.StepContextRepeatCallback    : Chunk execution starting: queue size=0
Gianni
  • 35
  • 4

1 Answers1

0

With a CompositeItemWriter, the step use also a MongoItemWriter, so the output is written both in mongodb and kafka. The step is configured to use the MongoTransactionManager.

If you want to write items to two transactional resources (MongoDB and Kafka in your case) and get the consistency you are looking for, you need to configure your step with a JtaTransactionManager that drives a MongoTransactionManager and a KafkaTransactionManager.

The following posts might be helpful:

Mahmoud Ben Hassine
  • 28,519
  • 3
  • 32
  • 50