7

I'm using debezium SQL Server to track changes on a production base. The topic is created, CDC is working like a charm, but when trying to use jdbcSinkConnector to dump the data in another Sql Server DB, I'm encountering the following error.

com.microsoft.sqlserver.jdbc.SQLServerException: One or more values is out of range of values for the datetime2 SQL Server data type

On the source database the sql datatype is timestamp2(7). The kafka event is 1549461754650000000. The schema type is INT64. The schema name io.debezium.time.NanoTimestamp.

I can't find a way to tell the TimestampConverter that is value isn't expressed in millis, or micros, but nanoseconds (would not work with microseconds anyway).

here is my connector configuration

{
    "name": "cdc.swip.bi.ods.sink.contract",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "swip.swip_core.contract",
        "connection.url": "jdbc:sqlserver://someip:1234;database=DB",
        "connection.user": "loloolololo",
        "connection.password": "muahahahahaha",
        "dialect.name": "SqlServerDatabaseDialect",
        "auto.create": "false",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schemas.enable": "true",
        "key.converter.schema.registry.url": "http://localhost:8081",
        "value.converter.schemas.enable": "true",
        "value.converter.schema.registry.url": "http://localhost:8081",
        "transforms": "unwrap,created_date,modified_date",
        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
        "transforms.created_date.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.created_date.target.type": "Timestamp",
        "transforms.created_date.field": "created_date",
        "transforms.modified_date.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.modified_date.target.type": "Timestamp",
        "transforms.modified_date.field": "modified_date",
        "insert.mode": "insert",
        "delete.enabled": "false",
        "pk.fields": "id",
        "pk.mode": "record_value",
        "schema.registry.url": "http://localhost:8081",
        "table.name.format": "ODS.swip.contract"
    }
}
nicolasL
  • 136
  • 1
  • 7

2 Answers2

2

there is a missing feature in the SQL Server connector - DBZ-1419.

You can workaround the problem by writing your own SMT that would do the field conversion on the sink side before it is processed by the JDBC connector.

Jiri Pechanec
  • 1,816
  • 7
  • 8
  • I gave a look at connect-transforms. I'm about to write a DebeziumTemporalConverter to apply the needed transformations. [link](https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java) – nicolasL Aug 01 '19 at 09:34
  • Anyway, datetimeoffset is also a real pain in the a**, since SQL Server debezium connector change the offset to pass an iso 8601 String with GMT reference. Even if the time is changed accordingly, the timezone offset is now lost. – nicolasL Aug 01 '19 at 09:41
2

I forgort to post the answer. The property "time.precision.mode":"connect" does the trick

https://debezium.io/documentation/reference/connectors/sqlserver.html#sqlserver-property-time-precision-mode

{
    "name":"debezium-connector-sqlserver",
    "config": {
        "connector.class":"io.debezium.connector.sqlserver.SqlServerConnector",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable":"true",
        "value.converter.schemas.enable":"true",
        "database.hostname":"someHost",
        "database.port":"somePort",
        "database.user":"someUser",
        "database.password":"somePassword",
        "database.dbname":"someDb",
        "database.server.name":"xxx.xxx",
        "database.history.kafka.topic":"xxx.xxx.history",
        "time.precision.mode":"connect",
        "database.history.kafka.bootstrap.servers":"example.com:9092"
    }
}
nicolasL
  • 136
  • 1
  • 7