I'm implementing CDC with Kafka connect and mongo, and sometimes the CDC message from mongo is too big for Kafka (RecordTooLargeException
). I would like to handle this error correctly and send the message to DLQ. I configured the connector with this parameter, but the connector keep crashing and my DLQ is empty.
"mongo.errors.tolerance": "all",
"mongo.errors.log.enable": true,
"mongo.errors.deadletterqueue.topic.name":"topic_dlq"
Do you know if it is possible to handle this kind of error?
The error I get is :
[2022-11-28 07:17:54,892] ERROR WorkerSourceTask{id=connector_name} failed to send record to topic_name: (org.apache.kafka.connect.runtime.WorkerSourceTask:377)
org.apache.kafka.common.errors.RecordTooLargeException: The message is 1141796 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
[2022-11-28 07:17:54,903] INFO WorkerSourceTask{id=connector_name} flushing 41 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:510)
[2022-11-28 07:17:57,146] ERROR Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this (org.apache.kafka.connect.storage.OffsetStorageWriter:109)
[2022-11-28 07:17:57,147] ERROR WorkerSourceTask{id=connector_name} Unhandled exception when committing: (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:118)
org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:111)
at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:505)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:110)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.lambda$schedule$0(SourceTaskOffsetCommitter.java:84)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.runAndReset(Unknown Source)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)