0

I have a Kafka cluster deployed by Strimzi and Apicurio Registry for Kafka schema registry.

I am hoping to use AvroConverter in the JDBC sink connector to sink data from Kafka to TimescaleDB.

Here is my Kafka Connect Dockerfile:

FROM docker.io/alpine:3.17.3 AS builder
USER root:root
RUN mkdir -p /opt/kafka/plugins/ \
  # jdbc-connector-for-apache-kafka
  # https://github.com/aiven/jdbc-connector-for-apache-kafka
  && wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.zip https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.zip \
  && unzip jdbc-connector-for-apache-kafka.zip -d /opt/kafka/plugins/ \
  && rm -f jdbc-connector-for-apache-kafka.zip
USER 1001

FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
USER root:root
COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
USER 1001

My Kafka Connect:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: hm-kafka-iot-kafka-connect
  namespace: hm-kafka
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  image: ghcr.io/hongbo-miao/hm-kafka-iot-kafka-connect:latest
  replicas: 1
  bootstrapServers: hm-kafka-kafka-bootstrap:9093
  tls:
    trustedCertificates:
      - secretName: hm-kafka-cluster-ca-cert
        certificate: ca.crt
  config:
    config.storage.replication.factor: 1
    offset.storage.replication.factor: 1
    status.storage.replication.factor: 1
    config.providers: file
    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
  externalConfiguration:
    volumes:
      - name: hm-iot-db-credentials-volume
        secret:
          secretName: hm-iot-db-credentials

My JDBC sink connector:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: hm-motor-jdbc-sink-kafka-connector
  namespace: hm-kafka
  labels:
    strimzi.io/cluster: hm-kafka-iot-kafka-connect
spec:
  class: io.aiven.connect.jdbc.JdbcSinkConnector
  tasksMax: 32
  config:
    connector.class: io.aiven.connect.jdbc.JdbcSinkConnector
    tasks.max: 32
    topics: hm.motor
    connection.url: jdbc:postgresql://timescale.hm-timescale.svc:5432/hm_iot_db
    connection.user: "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_user}"
    connection.password: "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_password}"
    insert.mode: multi
    batch.size: 100000

    # table
    table.name.format: motor

    # timestamp
    transforms: convertTimestamp
    transforms.convertTimestamp.type: org.apache.kafka.connect.transforms.TimestampConverter$Value
    transforms.convertTimestamp.field: timestamp
    transforms.convertTimestamp.target.type: Timestamp

    # value
    value.converter: io.apicurio.registry.utils.converter.AvroConverter
    value.converter.apicurio.registry.url: http://apicurio-registry-apicurio-registry.hm-apicurio-registry.svc:8080/apis/registry/v2
    value.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy
    value.converter.apicurio.registry.as-confluent: true

(Note the config related with apicurio.registry most likely have issues too.)

However, I met this error (let's call it Error 1):

Error 1

2023-05-01 07:23:23,849 ERROR [hm-motor-jdbc-sink-kafka-connector|worker] [Worker clientId=connect-1, groupId=connect-cluster] Failed to start connector 'hm-motor-jdbc-sink-kafka-connector' (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [StartAndStopExecutor-connect-1-1]
org.apache.kafka.connect.errors.ConnectException: Failed to start connector: hm-motor-jdbc-sink-kafka-connector
  at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$startConnector$34(DistributedHerder.java:1800)
  at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:320)
  at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1821)
  at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getConnectorStartingCallable$36(DistributedHerder.java:1827)
  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
  at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value io.apicurio.registry.utils.converter.AvroConverter for configuration value.converter: Class io.apicurio.registry.utils.converter.AvroConverter could not be found.
  at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:744)
  at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:490)
  at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483)
  at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:113)
  at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:133)
  at org.apache.kafka.connect.runtime.ConnectorConfig.<init>(ConnectorConfig.java:232)
  at org.apache.kafka.connect.runtime.SinkConnectorConfig.<init>(SinkConnectorConfig.java:85)
  at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:299)
  ... 6 more

Attempt 1 (to fix Error 1, Succeed)

Based on the error, I added apicurio-registry-utils-converter in my Kafka Connect Dockerfle:

FROM docker.io/alpine:3.17.3 AS builder
USER root:root
RUN mkdir -p /opt/kafka/plugins/ \
  # jdbc-connector-for-apache-kafka
  # https://github.com/aiven/jdbc-connector-for-apache-kafka
  && wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.zip https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.zip \
  && unzip jdbc-connector-for-apache-kafka.zip -d /opt/kafka/plugins/ \
  && rm -f jdbc-connector-for-apache-kafka.zip \

  # apicurio-registry-utils-converter
  # https://mvnrepository.com/artifact/io.apicurio/apicurio-registry-utils-converter
  && wget --no-verbose https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-utils-converter/2.4.2.Final/apicurio-registry-utils-converter-2.4.2.Final.jar \
  && mkdir -p /opt/kafka/plugins/apicurio-registry-utils-converter/ \
  && mv apicurio-registry-utils-converter-2.4.2.Final.jar /opt/kafka/plugins/apicurio-registry-utils-converter/
USER 1001

FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
USER root:root
COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
USER 1001

Now it can succeed locate io.apicurio.registry.utils.converter.AvroConverter, however I have a new error. (Let's call it error 2)

Error 2

2023-05-01 06:58:11,129 INFO [hm-motor-jdbc-sink-kafka-connector|task-0] TaskConfig values: 
  task.class = class io.aiven.connect.jdbc.sink.JdbcSinkTask
 (org.apache.kafka.connect.runtime.TaskConfig) [StartAndStopExecutor-connect-1-5]
2023-05-01 06:58:11,129 INFO [hm-motor-jdbc-sink-kafka-connector|task-0] Instantiated task hm-motor-jdbc-sink-kafka-connector-0 with version null of type io.aiven.connect.jdbc.sink.JdbcSinkTask (org.apache.kafka.connect.runtime.Worker) [StartAndStopExecutor-connect-1-5]
2023-05-01 06:58:11,129 ERROR [hm-motor-jdbc-sink-kafka-connector|task-0] Failed to start task hm-motor-jdbc-sink-kafka-connector-0 (org.apache.kafka.connect.runtime.Worker) [StartAndStopExecutor-connect-1-5]
java.lang.NoClassDefFoundError: io/apicurio/registry/serde/avro/AvroKafkaSerializer
  at io.apicurio.registry.utils.converter.AvroConverter.configure(AvroConverter.java:69)
  at org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:324)
  at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:618)
  at org.apache.kafka.connect.runtime.Worker.startSinkTask(Worker.java:521)
  at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1723)
  at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$31(DistributedHerder.java:1773)
  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
  at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.ClassNotFoundException: io.apicurio.registry.serde.avro.AvroKafkaSerializer
  ... 10 more

Attempt 2 (to fix Error 2, failed)

Based on the error, I added apicurio-registry-serdes-avro-serde in my Kafka Connect Dockerfile:

FROM docker.io/alpine:3.17.3 AS builder
USER root:root
RUN mkdir -p /opt/kafka/plugins/ \
  # jdbc-connector-for-apache-kafka
  # https://github.com/aiven/jdbc-connector-for-apache-kafka
  && wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.zip https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.zip \
  && unzip jdbc-connector-for-apache-kafka.zip -d /opt/kafka/plugins/ \
  && rm -f jdbc-connector-for-apache-kafka.zip \

  # apicurio-registry-utils-converter
  # https://mvnrepository.com/artifact/io.apicurio/apicurio-registry-utils-converter
  && wget --no-verbose https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-utils-converter/2.4.2.Final/apicurio-registry-utils-converter-2.4.2.Final.jar \
  && mkdir -p /opt/kafka/plugins/apicurio-registry-utils-converter/ \
  && mv apicurio-registry-utils-converter-2.4.2.Final.jar /opt/kafka/plugins/apicurio-registry-utils-converter/ \

  # apicurio-registry-serdes-avro-serde
  # https://mvnrepository.com/artifact/io.apicurio/apicurio-registry-serdes-avro-serde
  && wget --no-verbose https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-serdes-avro-serde/2.4.2.Final/apicurio-registry-serdes-avro-serde-2.4.2.Final.jar \
  && mkdir -p /opt/kafka/plugins/apicurio-registry-serdes-avro-serde/ \
  && mv apicurio-registry-serdes-avro-serde-2.4.2.Final.jar /opt/kafka/plugins/apicurio-registry-serdes-avro-serde/
USER 1001

FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
USER root:root
COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
USER 1001

But this time, the Error 2 is still there.

apicurio-registry-serdes-avro-serde seems not correct dependency to fix Error 2. What would be the correct dependency? Thanks!

Attempt 3 (Different direction)

I have followed @OneCricketeer's suggestion switching to kafka-connect-avro-converter and use with Apicurio Registry's Confluent compatible REST API endpoint /apis/ccompat/v6/ now.

Here is my Kafka Connect to use io.confluent.connect.avro.AvroConverter:

FROM docker.io/alpine:3.17.3 AS builder
USER root:root
  # kafka-connect-avro-converter
  # https://www.confluent.io/hub/confluentinc/kafka-connect-avro-converter
  && wget --no-verbose --output-document=kafka-connect-avro-converter.zip https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-avro-converter/versions/7.3.3/confluentinc-kafka-connect-avro-converter-7.3.3.zip \
  && mkdir -p /opt/kafka/plugins/kafka-connect-avro-converter/ \
  && unzip kafka-connect-avro-converter.zip -d /opt/kafka/plugins/kafka-connect-avro-converter/ \
  && rm -f kafka-connect-avro-converter.zip \

  # jdbc-connector-for-apache-kafka
  # https://github.com/aiven/jdbc-connector-for-apache-kafka
  && wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.tar https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.tar \
  && mkdir -p /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ \
  && tar -x -f jdbc-connector-for-apache-kafka.tar -C /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ \
  && rm -f jdbc-connector-for-apache-kafka.tar
USER 1001

FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
USER root:root
COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
USER 1001

Regarding the corresponding JDBC Sink Connector config, I have a different question at org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema for id -xxx

UPDATE: I found Confluent Avro format is different with vanilla Apache Avro which causes some inconvenience for Spark and other tools. So they are two different directions. Besides Confluent direction, I will continue looking for solution in this direction too.

Hongbo Miao
  • 45,290
  • 60
  • 174
  • 267
  • Have you tried inspecting those zips and jars you're downloading for that class name? Note that USER 1001 cannot read files created by root user – OneCricketeer May 01 '23 at 13:20
  • Good idea! I uncompressed **apicurio-registry-serdes-avro-serde-2.4.2.Final.jar** which does have `io.apicurio.registry.serde.avro.AvroKafkaSerializer`. Strange it does not work. Maybe related with `CLASSPATH` you mentioned in the answer. – Hongbo Miao May 01 '23 at 17:30

2 Answers2

1

The issue is before I added the dependency apicurio-registry-utils-converter.

However, the correct one is apicurio-registry-distro-connect-converter.

So here is my final Kafka Connect Dockerfile to use io.apicurio.registry.utils.converter.AvroConverter:

FROM docker.io/alpine:3.17.3 AS builder
USER root:root
RUN mkdir -p /opt/kafka/plugins/ \
  # apicurio-registry-distro-connect-converter
  # https://mvnrepository.com/artifact/io.apicurio/apicurio-registry-distro-connect-converter
  && wget --no-verbose --output-document=apicurio-registry-distro-connect-converter.tar.gz https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-distro-connect-converter/2.4.2.Final/apicurio-registry-distro-connect-converter-2.4.2.Final.tar.gz \
  && mkdir -p /opt/kafka/plugins/apicurio-registry-distro-connect-converter/ \
  && tar -x -f apicurio-registry-distro-connect-converter.tar.gz -C /opt/kafka/plugins/apicurio-registry-distro-connect-converter/ \
  && rm -f apicurio-registry-distro-connect-converter.tar.gz \

  # jdbc-connector-for-apache-kafka
  # https://github.com/aiven/jdbc-connector-for-apache-kafka
  && wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.tar https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.tar \
  && mkdir -p /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ \
  && tar -x -f jdbc-connector-for-apache-kafka.tar -C /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ \
  && rm -f jdbc-connector-for-apache-kafka.tar
USER 1001

FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
USER root:root
COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
USER 1001

For comparison purpose, here is the way to use io.confluent.connect.avro.AvroConverter

FROM docker.io/alpine:3.17.3 AS builder
USER root:root
  # kafka-connect-avro-converter
  # https://www.confluent.io/hub/confluentinc/kafka-connect-avro-converter
  && wget --no-verbose --output-document=kafka-connect-avro-converter.zip https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-avro-converter/versions/7.3.3/confluentinc-kafka-connect-avro-converter-7.3.3.zip \
  && mkdir -p /opt/kafka/plugins/kafka-connect-avro-converter/ \
  && unzip kafka-connect-avro-converter.zip -d /opt/kafka/plugins/kafka-connect-avro-converter/ \
  && rm -f kafka-connect-avro-converter.zip \

  # jdbc-connector-for-apache-kafka
  # https://github.com/aiven/jdbc-connector-for-apache-kafka
  && wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.tar https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.tar \
  && mkdir -p /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ \
  && tar -x -f jdbc-connector-for-apache-kafka.tar -C /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ \
  && rm -f jdbc-connector-for-apache-kafka.tar
USER 1001

FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
USER root:root
COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
USER 1001
Hongbo Miao
  • 45,290
  • 60
  • 174
  • 267
0

apicurio-registry-serdes-avro-serde dependency is correct for that class. But it should already be part of the Avro converter package.

But (De)Serializer classes are not Connect "plugins" in the same way Converters are. You need to export CLASSPATH environment variable to include extra directories where you're putting JAR files

I'd also suggest not using multi stage builds here, unless wget and unzip aren't available in the Strimzi image. Plus, the Apicurio Registry is compatible with Confluent Converter, so I'd recommend installing those (and Aiven connectors) using plugins feature, anyway

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • 1. You are right, `unzip` is not available in `quay.io/strimzi/kafka image` which is why I use multi stage build. 2. Hmm, I am not aware of `Strimzi Confluent Hub`. I think it does not exist. 3. I am wondering how to export `CLASSPATH` in this scenario. If I figured, I will report back, thanks! – Hongbo Miao May 01 '23 at 17:43
  • See [`spec.build.plugins`](https://strimzi.io/docs/operators/latest/full/deploying.html#creating-new-image-using-kafka-connect-build-str), otherwise [building your own image](https://strimzi.io/docs/operators/latest/full/deploying.html#creating-new-image-from-base-str) – OneCricketeer May 01 '23 at 22:01
  • @HongboMiao Based on your new post, you have fixed this? – OneCricketeer May 01 '23 at 22:20
  • I am using [building your own image](https://strimzi.io/docs/operators/latest/full/deploying.html#creating-new-image-from-base-str) which I showed the Dockerfile in my question. Not yet, if I figured, I will update. Thanks! – Hongbo Miao May 03 '23 at 05:36