0

I'm not certain what I could be missing.

  1. I have set up a Kafka broker server, with a Zookeeper and a distributed Kafka Connect.
  2. For schema management, I have set up an Apicurio Schema Registry instance
  3. I also have KSQLDB setup

The following I can confirm is working as expected

  1. My source JDBC connector successfully pushed table data into the topic stss.market.info.public.ice_symbols

Problem:

Inside the KSQLDB server, I have successfully created a table from the topic stss.market.info.public.ice_symbols

enter image description here

Here is the detail of the table created

enter image description here

The problem I'm facing is when performing a push query against this table, it returns no data. Deserialization of the data fails due to the unsuccessful lookup of the AVRO schema in the Apicurio Registry.

Looking at the Apicurio Registry logs reveals that KSQLDB calls to Apicrio Registry to fetch the deserialization schema using a schema ID of 0 instead of 5, which is the ID of the schema I have registered in the registry.

enter image description here

KSQLDB server logs also confirm this 404 HTTP response in the Apicurio logs as shown in the image below

enter image description here

Expectation:

I expect, KSQLDB queries to the table to perform a schema lookup with an ID of 5 and not 0. I'm guessing I'm probably missing some configuration.

Here is the image of the schema registered in the Apicruio Registry enter image description here enter image description here

Here is also my source connector configuration. It has the appropriate schema lookup strategy configured. Although, I don't believe KSQLDB requires this when deserialization its table data. This configuration should only be relevant to the capturing of the table data, and its validation and storage in the topic stss.market.info.public.ice_symbols.

{
  "name": "new.connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "172.17.203.10",
    "database.port": "6000",
    "database.user": "postgres",
    "database.password": "123",
    "database.dbname": "stss_market_info",
    "database.server.name": "stss.market.info",
    "table.include.list": "public.ice_symbols",
    "message.key.columns": "public.ice_symbols:name",
    "snapshot.mode": "always",
    "transforms": "unwrap,extractKey",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractKey.field": "name",
    "value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
    "value.converter.apicurio.registry.url": "http://local-server:8080/apis/registry/v2",
    "value.converter.apicurio.registry.auto-register": true,
    "value.converter.apicurio.registry.find-latest": true,
    "value.apicurio.registry.as-confluent": true, 
    "name": "new.connector",
    "value.converter.schema.registry.url": "http://local-server:8080/apis/registry/v2"
  }
}

Thanks in advance for any assistance.

Felix Otoo
  • 109
  • 1
  • 3

1 Answers1

0

You can specify the "VALUE_SCHEMA_ID=5" property in the WITH clause when you create a stream/table.

Bonnie Varghese
  • 2,158
  • 1
  • 16
  • 11