0

I'm implementing CDC with Kafka connect and mongo, and sometimes the CDC message from mongo is too big for Kafka (RecordTooLargeException). I would like to handle this error correctly and send the message to DLQ. I configured the connector with this parameter, but the connector keep crashing and my DLQ is empty.

 "mongo.errors.tolerance": "all",
 "mongo.errors.log.enable": true,
 "mongo.errors.deadletterqueue.topic.name":"topic_dlq"

Do you know if it is possible to handle this kind of error?

The error I get is :

[2022-11-28 07:17:54,892] ERROR WorkerSourceTask{id=connector_name} failed to send record to topic_name:  (org.apache.kafka.connect.runtime.WorkerSourceTask:377)
org.apache.kafka.common.errors.RecordTooLargeException: The message is 1141796 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
[2022-11-28 07:17:54,903] INFO WorkerSourceTask{id=connector_name} flushing 41 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:510)
[2022-11-28 07:17:57,146] ERROR Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this (org.apache.kafka.connect.storage.OffsetStorageWriter:109)
[2022-11-28 07:17:57,147] ERROR WorkerSourceTask{id=connector_name} Unhandled exception when committing:  (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:118)
org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
    at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:111)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:505)
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:110)
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.lambda$schedule$0(SourceTaskOffsetCommitter.java:84)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.base/java.util.concurrent.FutureTask.runAndReset(Unknown Source)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Devidb
  • 67
  • 8
  • 1
    If the record is too large for the original topic, why would it be able to send to DLQ? You need to edit broker config to be able to accept those records, at all – OneCricketeer Nov 29 '22 at 03:23
  • Yes you are right, I was thinking of sending an error with an id, not the entire message, but maybe it is a bit complicated. Actually, my question was mostly about the crash of the worker. Is it possible to juste log the error and continue? – Devidb Nov 29 '22 at 09:04
  • I tried also the not mongo specific config for error handling, but the source connector keep crashing : "errors.tolerance": "all", "errors.log.enable": true – Devidb Nov 29 '22 at 09:15
  • I'm not sure about the `already flushing` error, but the error tolerance setting is only applicable for errors within the Connect framework itself, AFAIK. It doesn't cover everything, such as RecordTooLarge. That'll always halt the client, in my experience until the buffer sizes are expanded. Besides, if that's the data in your database, then certainly you do want that data in Kafka? So your producer requests should be as large as your max Mongo document, and your broker should accept it. Ref https://stackoverflow.com/questions/21020347 – OneCricketeer Nov 29 '22 at 12:07

0 Answers0