0

I am trying to figure what Kafka record header format does Apicurio Registry's deserializer expect.

My Avro Schema in Apicurio Registry

I have an artifact with ID hm.motor-value.

curl --location 'http://apicurio-registry.svc/apis/registry/v2/groups/default/artifacts/hm.motor-value'

will return

{
    "type": "record",
    "namespace": "com.hongbomiao",
    "name": "motor",
    "fields": [
        {
            "name": "timestamp",
            "type": "long"
        },
        {
            "name": "current",
            "type": "double"
        },
        {
            "name": "voltage",
            "type": "double"
        },
        {
            "name": "temperature",
            "type": "double"
        }
    ]
}

My Spark code

Here is my Spark code to generate Kafka record in Avro format:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, struct}
import org.apache.spark.sql.types.{DoubleType, LongType, StructType}
import org.apache.spark.sql.avro.functions.to_avro
import sttp.client3.{HttpClientSyncBackend, UriContext, basicRequest}

object IngestFromS3ToKafka {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("ingest-from-s3-to-kafka")
      .config("spark.ui.port", "4040")
      .getOrCreate()

    val folderPath = "s3a://hongbomiao-bucket/iot/"
    val parquetSchema = new StructType()
      .add("timestamp", DoubleType)
      .add("current", DoubleType, nullable = true)
      .add("voltage", DoubleType, nullable = true)
      .add("temperature", DoubleType, nullable = true)

    val backend = HttpClientSyncBackend()
    val res = basicRequest
      .get(
        uri"http://apicurio-registry.svc:8080/apis/registry/v2/groups/default/artifacts/hm.motor-value"
      )
      .send(backend)
    val kafkaRecordValueSchema = res.body.fold(identity, identity)

    val df = spark.readStream
      .schema(parquetSchema)
      .option("maxFilesPerTrigger", 1)
      .parquet(folderPath)
      .withColumn("timestamp", (col("timestamp") * 1000).cast(LongType))
      .select(to_avro(struct("*"), kafkaRecordValueSchema).alias("value"))

    val query = df.writeStream
      .format("kafka")
      .option(
        "kafka.bootstrap.servers",
        "hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
      )
      .option("topic", "hm.motor")
      .option("checkpointLocation", "/tmp/checkpoint")
      .start()

    query.awaitTermination()
  }
}

My JDBC connector config

{
    "name": "hm-motor-jdbc-sink-kafka-connector",
    "connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
    "tasks.max": 10,
    "topics": "hm.motor",
    "connection.url": "jdbc:postgresql://timescale.hm-timescale.svc:5432/hm_iot_db",
    "connection.user": "xxx",
    "connection.password": "xxx",
    "insert.mode": "insert",
    "table.name.format": "motor",
    "value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
    "value.converter.apicurio.registry.url": "http://apicurio-registry.svc:8080/apis/registry/v2",
    "value.converter.apicurio.registry.fallback.artifact-id": "hm.motor-value"
}

Above code works well. Now I am hoping to get rid of "value.converter.apicurio.registry.fallback.artifact-id": "hm.motor-value" in the Kafka connector.

Based on this doc,

TopicIdStrategy: Default strategy that uses the topic name and key or value suffix.

my Kafka topic name is hm.motor, I expect it to match with the schema with artifact ID hm.motor-value.

However, right now if I remove "value.converter.apicurio.registry.fallback.artifact-id": "hm.motor-value" in my JDBC sink connector, I will get error:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:518)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:495)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:335)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.IllegalStateException: artifactId cannot be null
    at io.apicurio.registry.resolver.DefaultSchemaResolver.resolveSchemaByCoordinates(DefaultSchemaResolver.java:177)
    at io.apicurio.registry.resolver.DefaultSchemaResolver.resolveSchemaByArtifactReference(DefaultSchemaResolver.java:172)
    at io.apicurio.registry.serde.AbstractKafkaDeserializer.resolve(AbstractKafkaDeserializer.java:147)
    at io.apicurio.registry.serde.AbstractKafkaDeserializer.readData(AbstractKafkaDeserializer.java:136)
    at io.apicurio.registry.serde.AbstractKafkaDeserializer.deserialize(AbstractKafkaDeserializer.java:131)
    at io.apicurio.registry.utils.converter.SerdeBasedConverter.toConnectData(SerdeBasedConverter.java:139)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:518)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
    ... 14 more

Any idea? Thanks!


UPDATE 1 (5/5/2023)

I found out Apicurio Registry's deserializer expect artifact ID in the Kafka record header.

The Apicurio Registry's serializer's Java code at here.

I tried something like this

    val df = spark.readStream
      .schema(parquetSchema)
      .option("maxFilesPerTrigger", 1)
      .parquet(folderPath)
      .withColumn("timestamp", (col("timestamp") * 1000).cast(LongType))
      .select(to_avro(struct("*"), kafkaRecordValueSchema).alias("value"))
      .withColumn(
        "headers",
        array(
          struct(
            lit("apicurio.registry.headers.value.groupId.name") as "key",
            lit("default").cast("binary") as "value"
          ),
          struct(
            lit("apicurio.registry.headers.value.artifactId.name") as "key",
            lit("hm.motor-value").cast("binary") as "value"
          )
        )
      )

This will add Kafka record headers, however, I still got same error. I think my Kafka record header is still not correct.

enter image description here

Hongbo Miao
  • 45,290
  • 60
  • 174
  • 267
  • As answered before, Spark `to_avro` does not correctly serialize the data in the way you need to use any Registry – OneCricketeer May 05 '23 at 13:42
  • Thanks @OneCricketeer if I understand correctly, as currently I don't use Confluent registry and deserializer, I don't need ABRiS's `toConfluentAvro` any more. This makes the code clean. For `toSimpleAvro`, I think [ABRiS is reusing Spark's serializer](https://github.com/AbsaOSS/ABRiS/pull/241) plus Kafka record header (?). I might just miss a Kafka record header with artifact ID now. I have another discussion opened at [here](https://github.com/Apicurio/apicurio-registry/discussions/3322). I also tried ABRiS, I will post what issues I met trying ABRiS in another question later. Appreciate! – Hongbo Miao May 05 '23 at 17:17
  • 1
    The headers are not required. Look at line with `out.write(MAGIC_BYTE);` with `writeId` on the following line. The `to_avro` call doesn't do this, as there is no interaction with the registry to actually compute any Id. That's what I was referring to – OneCricketeer May 05 '23 at 21:49
  • 1
    Thanks @OneCricketeer, I think you are right for this header thing. And I actually finally made Confluent Registry in Confluent Avro work too using ABRiS, I posted the detailed solution at `1.1` and `2.1.1` section of [this answer](https://stackoverflow.com/a/76168195/2000548). Really appreciate! – Hongbo Miao May 05 '23 at 22:01

0 Answers0