0

I am using a JDBC source connector and recently encountered a data duplication issue in Kafka. After some debugging, I found out that the producer was throwing the following error:

 WorkerSourceTask{id=connect-name-0} Failed to flush, timed out while waiting for producer to flush outstanding 6590 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:509)

After following this question, I changed the flush timeout and the error is fixed for now. But I didn't understand why the data is duplicated. Here is my understanding: Let's assume that there are 20k rows in the source database and the connector starts with offset 0 and batch size is 1000 and the poll interval is 2 hours, flush interval is 1 min and the flush timeout is 5 seconds.

  1. connector/task executes the query and starts getting 1000 rows from DB.
  2. The buffer memory is 32MB so the task buffers the data and tries to flush it every minute. Assume 15k rows are buffered.
  3. As data is huge, it fails to flush data and commit offset within 5 seconds.

Now there are 3 scenarios:

  1. if flush is atomic, no offsets will be committed so no chance of duplication when the task restarts before the next commit cycle.
  2. If the connector pushes data to the topic but fails to commit offset because of flush timeout. So out of 15K, 10k was written to the topic but the offset is still 0.
  3. If the flush times out, it rollbacks the data written as well as offset commit.

Which scenario is applicable in this case?

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
aditya81070
  • 678
  • 1
  • 5
  • 22
  • I think you're forgetting that the Kafka producer can retry, regardless of the flushing being atomic. If you don't want duplicates, look at `enable.idempotence` and max in flight requests producer configs – OneCricketeer Feb 16 '23 at 14:39
  • Yeah, I agree that the producer can retry and it is retrying but my doubt is that, if flush is not atomic and the producer retries the complete batch then will it duplicate data? – aditya81070 Feb 16 '23 at 16:10
  • flush is a blocking method. It should be atomic within the context of one thread. – OneCricketeer Feb 16 '23 at 20:24

0 Answers0