I am trying to figure what Kafka record header format does Apicurio Registry's deserializer expect.
My Avro Schema in Apicurio Registry
I have an artifact with ID hm.motor-value
.
curl --location 'http://apicurio-registry.svc/apis/registry/v2/groups/default/artifacts/hm.motor-value'
will return
{
"type": "record",
"namespace": "com.hongbomiao",
"name": "motor",
"fields": [
{
"name": "timestamp",
"type": "long"
},
{
"name": "current",
"type": "double"
},
{
"name": "voltage",
"type": "double"
},
{
"name": "temperature",
"type": "double"
}
]
}
My Spark code
Here is my Spark code to generate Kafka record in Avro format:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, struct}
import org.apache.spark.sql.types.{DoubleType, LongType, StructType}
import org.apache.spark.sql.avro.functions.to_avro
import sttp.client3.{HttpClientSyncBackend, UriContext, basicRequest}
object IngestFromS3ToKafka {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("ingest-from-s3-to-kafka")
.config("spark.ui.port", "4040")
.getOrCreate()
val folderPath = "s3a://hongbomiao-bucket/iot/"
val parquetSchema = new StructType()
.add("timestamp", DoubleType)
.add("current", DoubleType, nullable = true)
.add("voltage", DoubleType, nullable = true)
.add("temperature", DoubleType, nullable = true)
val backend = HttpClientSyncBackend()
val res = basicRequest
.get(
uri"http://apicurio-registry.svc:8080/apis/registry/v2/groups/default/artifacts/hm.motor-value"
)
.send(backend)
val kafkaRecordValueSchema = res.body.fold(identity, identity)
val df = spark.readStream
.schema(parquetSchema)
.option("maxFilesPerTrigger", 1)
.parquet(folderPath)
.withColumn("timestamp", (col("timestamp") * 1000).cast(LongType))
.select(to_avro(struct("*"), kafkaRecordValueSchema).alias("value"))
val query = df.writeStream
.format("kafka")
.option(
"kafka.bootstrap.servers",
"hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
)
.option("topic", "hm.motor")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
query.awaitTermination()
}
}
My JDBC connector config
{
"name": "hm-motor-jdbc-sink-kafka-connector",
"connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
"tasks.max": 10,
"topics": "hm.motor",
"connection.url": "jdbc:postgresql://timescale.hm-timescale.svc:5432/hm_iot_db",
"connection.user": "xxx",
"connection.password": "xxx",
"insert.mode": "insert",
"table.name.format": "motor",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://apicurio-registry.svc:8080/apis/registry/v2",
"value.converter.apicurio.registry.fallback.artifact-id": "hm.motor-value"
}
Above code works well. Now I am hoping to get rid of "value.converter.apicurio.registry.fallback.artifact-id": "hm.motor-value"
in the Kafka connector.
Based on this doc,
TopicIdStrategy: Default strategy that uses the topic name and
key
orvalue
suffix.
my Kafka topic name is hm.motor
, I expect it to match with the schema with artifact ID hm.motor-value
.
However, right now if I remove "value.converter.apicurio.registry.fallback.artifact-id": "hm.motor-value"
in my JDBC sink connector, I will get error:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:518)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:495)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:335)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.IllegalStateException: artifactId cannot be null
at io.apicurio.registry.resolver.DefaultSchemaResolver.resolveSchemaByCoordinates(DefaultSchemaResolver.java:177)
at io.apicurio.registry.resolver.DefaultSchemaResolver.resolveSchemaByArtifactReference(DefaultSchemaResolver.java:172)
at io.apicurio.registry.serde.AbstractKafkaDeserializer.resolve(AbstractKafkaDeserializer.java:147)
at io.apicurio.registry.serde.AbstractKafkaDeserializer.readData(AbstractKafkaDeserializer.java:136)
at io.apicurio.registry.serde.AbstractKafkaDeserializer.deserialize(AbstractKafkaDeserializer.java:131)
at io.apicurio.registry.utils.converter.SerdeBasedConverter.toConnectData(SerdeBasedConverter.java:139)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:518)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
... 14 more
Any idea? Thanks!
UPDATE 1 (5/5/2023)
I found out Apicurio Registry's deserializer expect artifact ID in the Kafka record header.
The Apicurio Registry's serializer's Java code at here.
I tried something like this
val df = spark.readStream
.schema(parquetSchema)
.option("maxFilesPerTrigger", 1)
.parquet(folderPath)
.withColumn("timestamp", (col("timestamp") * 1000).cast(LongType))
.select(to_avro(struct("*"), kafkaRecordValueSchema).alias("value"))
.withColumn(
"headers",
array(
struct(
lit("apicurio.registry.headers.value.groupId.name") as "key",
lit("default").cast("binary") as "value"
),
struct(
lit("apicurio.registry.headers.value.artifactId.name") as "key",
lit("hm.motor-value").cast("binary") as "value"
)
)
)
This will add Kafka record headers, however, I still got same error. I think my Kafka record header is still not correct.