5

I am trying to create a Debezium MySQL connector with a transformation to extract the key.

Before key transformations:

create source connector mysql with(
    "connector.class" = 'io.debezium.connector.mysql.MySqlConnector',
    "database.hostname" = 'mysql',
    "tasks.max" = '1',
    "database.port" = '3306',
    "database.user" = 'debezium',
    "database.password" = 'dbz',
    "database.server.id" = '42',
    "database.server.name" = 'before',
    "table.whitelist" = 'deepprices.deepprices',
    "database.history.kafka.bootstrap.servers" = 'kafka:29092',
    "database.history.kafka.topic" = 'dbz.deepprices',
    "include.schema.changes" = 'true',
    "transforms" = 'unwrap',
    "transforms.unwrap.type" = 'io.debezium.transforms.UnwrapFromEnvelope');

Topic results are :

> rowtime: 2020/05/20 16:47:23.354 Z, key: [St@5778462697648631933/8247607644536792125], value: {"id": "P195910", "price": "1511.64"}

When the key.converter is set to JSON, Key becomes {"id": "P195910"}

So, I want to extract id from key and make it a string key:

Expected results :

rowtime: 2020/05/20 16:47:23.354 Z, 
key: 'P195910', 
value: {"id": "P195910", "price": "1511.64"}   

While trying to use a transformation with ExtractField or ValueToKey I get:

DataException: Field does not exist: id:

My try with instruction containing ValueToKey:

create source connector mysql with(
    "connector.class" = 'io.debezium.connector.mysql.MySqlConnector',
    "database.hostname" = 'mysql',
    "tasks.max" = '1',
    "database.port" = '3306',
    "database.user" = 'debezium',
    "database.password" = 'dbz',
    "database.server.id" = '42',
    "database.server.name" = 'after',
    "table.whitelist" = 'deepprices.deepprices',
    "database.history.kafka.bootstrap.servers" = 'kafka:29092',
    "database.history.kafka.topic" = 'dbz.deepprices',
    "include.schema.changes" = 'true',
    "key.converter" = 'org.apache.kafka.connect.json.JsonConverter',
    "key.converter.schemas.enable" = 'TRUE',
    "value.converter" = 'org.apache.kafka.connect.json.JsonConverter',
    "value.converter.schemas.enable" = 'TRUE',
    "transforms" = 'unwrap,createkey',
    "transforms.unwrap.type" = 'io.debezium.transforms.UnwrapFromEnvelope',
    "transforms.createkey.type" = 'org.apache.kafka.connect.transforms.ValueToKey',
    "transforms.createkey.fields" = 'id'
    );

Causes the following error in my Kafka-connect log:

Caused by: org.apache.kafka.connect.errors.DataException: Field does not exist: id
        at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:89)
        at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:67)
bad_coder
  • 11,289
  • 20
  • 44
  • 72
4it med
  • 181
  • 1
  • 7
  • 1
    Instead of SMT, why not specify debezium configuration that you want the 'id' column as key? "message.key.columns": ".:" – Anupreet May 22 '20 at 09:57

2 Answers2

7

Changing the transformation type from UnwrapFromEnvelope to ExtractNewRecordState, solved the issue on Debezium MySQL CDC Connector, version 1.1.0.

transforms.unwrap.type" = 'io.debezium.transforms.ExtractNewRecordState'
4it med
  • 181
  • 1
  • 7
0

Since you're using ksqlDB here you'll want to set your source connector to write the key as a String:

key.converter=org.apache.kafka.connect.storage.StringConverter

Robin Moffatt
  • 30,382
  • 3
  • 65
  • 92
  • The debezium Transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState', Instead of UnwrapFromEnvelope transformation solved the problem. But, I don't know why! – 4it med May 20 '20 at 21:05
  • 2
    UnwrapFromEnvelope was deprecated and removed in 1.2.0.Debezium Alpha1 – Iskuskov Alexander May 20 '20 at 23:00