1

I am using the JDBC sink connector and have a bad message in the topic. I know why the message is bad (it is failing due to a FK constraint violation because of a issue with a producer). The error being reported by the worker task is:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: java.sql.BatchUpdateException: Cannot add or update a child row: a foreign key constraint fails (`sensorium`.`reading`, CONSTRAINT `reading_ibfk_1` FOREIGN KEY (`sensorId`) REFERENCES `sensor` (`id`))\ncom.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolationException: Cannot add or update a child row: a foreign key constraint fails (`sensorium`.`reading`, CONSTRAINT `reading_ibfk_1` FOREIGN KEY (`sensorId`) REFERENCES `sensor` (`id`))\n
io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:86)
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)\n\t... 10 more\nCaused by: java.sql.SQLException: java.sql.BatchUpdateException: 
Cannot add or update a child row: a foreign key constraint fails
(`sensorium`.`reading`, CONSTRAINT `reading_ibfk_1` FOREIGN KEY (`sensorId`) REFERENCES `sensor` 
(`id`))\ncom.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolation
Exception: Cannot add or update a child row: a foreign key constraint
fails (`sensorium`.`reading`, CONSTRAINT `reading_ibfk_1` FOREIGN KEY
(`sensorId`) REFERENCES `sensor` (`id`))

What I want to happen is for this bad message to be skipped. So I have tried setting "errors.tolerance": "all". The full config for the sink connector is as follows:

{
    "name": "reading-sink2",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": 4,
        "topics": "READING_MYSQL",
        "key.converter.schema.registry.url": "http://localhost:8081",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://localhost:8081",
        "connection.url": "jdbc:mysql://localhost:3306/sensorium?user=app&password=tQpRMCzHlAeu6kQIBk4U",
        "auto.create": true,
        "table.name.format": "reading",
        "errors.tolerance": "all"
    }
}

But the same error is being logged, the message is not being skipped and subsequent messages are not being processed.

Why is errors.tolerance: all not working as expected?

Robin Moffatt
  • 30,382
  • 3
  • 65
  • 92
Sam Shiles
  • 10,529
  • 9
  • 60
  • 72

2 Answers2

3

errors.tolerance property refers to errors that occur during converting (message conversion to/from Kafka Connect schema) or transforming messages (applying Single Message Transformation).

You can't skip/swallow exceptions, that are thrown during SinkTask::put(Collection<SinkRecord> records) or SourceTask::poll()

In you case exception is thrown in SinkTask::put(...)

io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:86)

Question regarding similar issues:

You can read more about that in the following blog at confluent page: https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues

Bartosz Wardziński
  • 6,185
  • 1
  • 19
  • 30
  • thanks very much for your answer, really appreciate it. It's not the answer I was hoping for though, this feels like a poor design decision in the JDBC sink connector. Surely the error handling should apply to any errors during the processing of a message, I can't understand why it wouldn't. What options are there for recovering the pipeline other than fudging the db to resolve the issue and then deleting the bad records or writing my own sink connector? – Sam Shiles Apr 01 '19 at 06:38
  • 1
    Error handling apply to **ALL** Connectors, not only JDBC sink, it is Kafka Connect behaviour. As a workaround you can skip _poison_ messages with moving the offset. It difficult to consider it as way of handling an errors, but single message can be handle like that. Kafka Connect under the hood use KafkaConsumer and KafkaProducer. Name of the consumer group is same as Connector name. – Bartosz Wardziński Apr 01 '19 at 07:15
1

You can manually skip bad records, using the kafka-consumer-groups tool:

kafka-consumer-groups \
    --bootstrap-server kafka:29092 \
    --group connect-sink_postgres_foo_00 \
    --reset-offsets \
    --topic foo \
    --to-offset 2 \
    --execute

For more info see here.

I've logged an improvement suggestion for the sink, feel free to upvote: https://github.com/confluentinc/kafka-connect-jdbc/issues/721

Robin Moffatt
  • 30,382
  • 3
  • 65
  • 92