I wonder could I get some help understanding transactions in Kafka and in particular how I use transaction.id. Here's the context:
- My Kafka application follows the pattern: consume message from input topic, process, publish to output topic.
- I am using not using the Kafka Streams API.
- I have multiple consumers in a single consumer group and each consumer is in its own polling thread.
- There is a thread pool with worker threads that do the message processing and publishing to the output topic. At the moment, each thread has its own producer instance.
- I am using the published transactions API to ensure that the update of the consume offset and the publishing to the output topic happen atomically
My assumptions to date have included:
- If my process crashed in mid transaction then nothing from that transaction would have published and no consume offset would have moved. So upon restart, I would simply start the transaction again from the original consume offset.
- For the producer transaction.id, all that mattered was that it was unique. I could therefore generate a timestamp based id at start-up
Then I read the following blog: https://www.confluent.io/blog/transactions-apache-kafka/. In particular in the section "How to pick a transaction id" it seems to imply that I need to guarantee that a producer instance per input partition. It says "The key to fencing out zombies properly is to ensure that the input topics and partitions in the read-process-write cycle is always the same for a given transactional.id.". It further cites the problem example as follows: "For instance, in a distributed stream processing application, suppose topic-partition tp0 was originally processed by transactional.id T0. If, at some point later, it could be mapped to another producer with transactional.id T1, there would be no fencing between T0 and T1. So it is possible for messages from tp0 to be reprocessed, violating the exactly once processing guarantee."
I can't quite understand why this is the case. To my mind, I shouldn't care what producer handles messages from any partition as long as transactions are atomic. I've been struggling with this for a day and I wonder if someone could tell me what I've missed here. So, why can't I assign work to any producer instance with any transaction.id setting as long as it is unique. And why do they say that messages can leak through the fencing provided by transactions if you do this.