0

I have a simple Spark app generating Kafka messages by

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, struct}
import org.apache.spark.sql.avro.functions.to_avro
import org.apache.spark.sql.types.{DoubleType, LongType, StructType}

object IngestFromS3ToKafka {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("ingest-from-s3-to-kafka")
      .config("spark.ui.port", "4040")
      .getOrCreate()

    val folderPath = "s3a://hongbomiao-bucket/iot/"

    val parquet_schema = new StructType()
      .add("timestamp", DoubleType)
      .add("current", DoubleType, nullable = true)
      .add("voltage", DoubleType, nullable = true)
      .add("temperature", DoubleType, nullable = true)

    val df = spark.readStream
      .schema(parquet_schema)
      .option("maxFilesPerTrigger", 1)
      .parquet(folderPath)
      .withColumn("timestamp", (col("timestamp") * 1000).cast(LongType))
      .select(to_avro(struct("*")).alias("value"))

    val query = df.writeStream
      .format("kafka")
      .option(
        "kafka.bootstrap.servers",
        "hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
      )
      .option("topic", "hm.motor")
      .option("checkpointLocation", "/tmp/checkpoint")
      .start()

    query.awaitTermination()
  }
}

I have a Avro schema at Apicurio Registry created by

curl --location 'http://apicurio-registry-apicurio-registry.hm-apicurio-registry.svc:8080/apis/registry/v2/groups/hm-group/artifacts' \
--header 'Content-type: application/json; artifactType=AVRO' \
--header 'X-Registry-ArtifactId: hm-iot' \
--data '{
    "type": "record",
    "namespace": "com.hongbomiao",
    "name": "hm.motor",
    "fields": [
        {
            "name": "timestamp",
            "type": "long"
        },
        {
            "name": "current",
            "type": "double"
        },
        {
            "name": "voltage",
            "type": "double"
        },
        {
            "name": "temperature",
            "type": "double"
        }
    ]
}'

enter image description here

enter image description here

I am trying to use Apicurio Registry's Confluent compatible REST API endpoint. Currently using Content ID 26 to retrieve

curl --location 'http://apicurio-registry-apicurio-registry.hm-apicurio-registry.svc:8080/apis/ccompat/v6/schemas/ids/26' \
  --header 'Content-type: application/json; artifactType=AVRO' \
  --header 'X-Registry-ArtifactId: hm-iot'

which prints

{
    "schema": "{\n    \"type\": \"record\",\n    \"namespace\": \"com.hongbomiao\",\n    \"name\": \"hm.motor\",\n    \"fields\": [\n        {\n            \"name\": \"timestamp\",\n            \"type\": \"long\"\n        },\n        {\n            \"name\": \"current\",\n            \"type\": \"double\"\n        },\n        {\n            \"name\": \"voltage\",\n            \"type\": \"double\"\n        },\n        {\n            \"name\": \"temperature\",\n            \"type\": \"double\"\n        }\n    ]\n}",
    "references": []
}

which looks good.

Based on Aiven's JDBC connector doc, I wrote my JDBC sink connector config:

{
    "name": "hm-motor-jdbc-sink-kafka-connector",
    "config": {
        "connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
        "tasks.max": 1,
        "topics": "hm.motor",
        "connection.url": "jdbc:postgresql://timescale.hm-timescale.svc:5432/hm_iot_db",
        "connection.user": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_user}",
        "connection.password": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_password}",

        "insert.mode": "upsert",

        "table.name.format": "motor",

        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://apicurio-registry-apicurio-registry.hm-apicurio-registry.svc:8080/apis/ccompat/v6",

        "transforms": "convertTimestamp",
        "transforms.convertTimestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.convertTimestamp.field": "timestamp",
        "transforms.convertTimestamp.target.type": "Timestamp"
    }
}

However, I got this error in my Kafka Connect log

2023-05-01 19:01:11,291 ERROR [hm-motor-jdbc-sink-kafka-connector|task-0] WorkerSinkTask{id=hm-motor-jdbc-sink-kafka-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-hm-motor-jdbc-sink-kafka-connector-0]
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:518)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:495)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:335)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic hm.motor to Avro: 
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:518)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
    ... 14 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema for id -1330532454
    at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:253)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaForDeserialize(AbstractKafkaAvroDeserializer.java:372)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:203)
    at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
    ... 18 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: No content with id/hash 'contentId--1330532454' was found.; error code: 40403
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:314)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:384)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:853)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:826)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:311)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:433)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaForDeserialize(AbstractKafkaAvroDeserializer.java:361)
    ... 21 more

It tries to get Content ID -1330532454, but obviously I don't have this. Mine is at 26. How does the JDBC look for the corresponding AVRO schema?

I am not sure how does it map now. I thought it will look for a schema called hm.motor based on Kafka topic, but turns out not.

Thanks!


UPDATE 1

Thanks @Ftisiot!

I found the document about the Kafka serializers and deserializers.

The Kafka serializers and deserializers default to using <topicName>-key and <topicName>-value as the corresponding subject name while registering or retrieving the schema.

Also for value.converter.value.subject.name.strategy, it uses io.confluent.kafka.serializers.subject.TopicNameStrategy by default.

I have updated my Avro schema name to hm.motor-value, but still got same error.

enter image description here

Hongbo Miao
  • 45,290
  • 60
  • 174
  • 267
  • Please clarify how you are producing data. The negative value would indicate your Avro is encoded incorrectly, which means your producer code is the issue, not the Connector / Converter / Registry. Plus, the producer should auto-register any schema, rather than you needing to manually register – OneCricketeer May 01 '23 at 22:18
  • @OneCricketeer thanks, I added how I generated in my question and cleaned a little bit. Ah I kind of getting what you mean about auto-register the schema, but in that case, who provide the initial schema using AVRO way? I thought I need provide. – Hongbo Miao May 02 '23 at 01:58
  • The producer always defines the schema, in my experience. In other words, if "someone else" registers the schema, then the "producer" uses a different schema, then that wouldn't make sense. The below mentioned `toConfluentAvro` function should register a schema, based on the Spark Dataframe schema, but it's been a few years since I did the research into it – OneCricketeer May 02 '23 at 22:12

3 Answers3

1

I believe the default schema name would be the concatenation of the topic name and either -value or -key depending on the part of the msg you are decoding.

Therefore, in your case, I would try with a schema name hm.motor-value.

In this video you can check the automatically generated schema names when encoding from json to avro using flink.

Disclaimer: I work for Aiven and we should update the docs to reflect this

Ftisiot
  • 1,808
  • 1
  • 7
  • 13
1

Forget about Connect for a minute. You should debug your topic with kafka-avro-console-consumer first. You'll get the same error there since your producer needs to properly encode the data.

Spark's to_avro doesn't do this.

See toConfluentAvro function of this library - https://github.com/AbsaOSS/ABRiS

More details about the internals https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format

Regarding your schema issues, name refers to a fully qualified Java classname, as defined by the Avro specification, and bears no relation to the Registry subject when using TopicNameStategy

What is this subject name

It's the path parameter in the API call POST /subjects/:name/versions/ used by the Serializer and Deserializer internal HTTP clients.

Also mentioned before, Kafka Connect is not necessary here. Spark can write directly to JDBC databases. The source of the data can be Parquet or Kafka.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
1

Thanks everyone's help, I finally figured out! I will try to summarize what I learned.

1. Generating Kafka message in Avro format

There are two major types of Avro data actually:

  • Confluent Avro
  • "Vanilla" Avro

1.1 [Succeed] Generating Confluent Avro data in Spark

Confluent Avro is not "vanilla" Avro which causes some inconvenience for Spark and other tools.

As @OneCricketeer pointed out, there is a library ABRiS to help generate Confluent Avro format Kafka message (toConfluentAvro).

First generate Avro schema by

curl --location 'http://confluent-schema-registry.svc:8081/subjects/hm.motor-value/versions' \
--header 'Content-Type: application/vnd.schemaregistry.v1+json' \
--data '{
    "schema": "{\"type\": \"record\", \"name\": \"motor\", \"fields\":[{ \"name\": \"timestamp\", \"type\": \"long\"},{ \"name\": \"current\", \"type\": \"double\"},{ \"name\": \"voltage\", \"type\": \"double\"},{ \"name\": \"temperature\", \"type\": \"double\"}]}"
}'
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, struct}
import org.apache.spark.sql.types.{DoubleType, LongType, StructType}
import za.co.absa.abris.avro.functions.to_avro
import za.co.absa.abris.config.{AbrisConfig, ToAvroConfig}

object IngestFromS3ToKafka {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("ingest-from-s3-to-kafka")
      .config("spark.ui.port", "4040")
      .getOrCreate()

    val folderPath = "s3a://hongbomiao-bucket/iot/"

    val parquetSchema = new StructType()
      .add("timestamp", DoubleType)
      .add("current", DoubleType, nullable = true)
      .add("voltage", DoubleType, nullable = true)
      .add("temperature", DoubleType, nullable = true)

    val toAvroConfig: ToAvroConfig =
      AbrisConfig.toConfluentAvro.downloadSchemaByLatestVersion
        .andTopicNameStrategy("hm.motor")
        .usingSchemaRegistry(
          "http://confluent-schema-registry.svc:8081"
        )

    val df = spark.readStream
      .schema(parquetSchema)
      .option("maxFilesPerTrigger", 1)
      .parquet(folderPath)
      .withColumn("timestamp", (col("timestamp") * 1000).cast(LongType))
      .select(to_avro(struct("*"), toAvroConfig).as("value"))

    val query = df.writeStream
      .format("kafka")
      .option(
        "kafka.bootstrap.servers",
        "hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
      )
      .option("topic", "hm.motor")
      .option("checkpointLocation", "/tmp/checkpoint")
      .start()

    query.awaitTermination()
  }
}

build.sbt

name := "IngestFromS3ToKafka"
version := "1.0"
scalaVersion := "2.12.17"
resolvers += "confluent" at "https://packages.confluent.io/maven/"
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "3.3.2" % "provided",
  "org.apache.spark" %% "spark-sql" % "3.3.2" % "provided",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.3.2" % "provided",
  "org.apache.spark" %% "spark-avro" % "3.4.0" % "provided",
  "org.apache.hadoop" % "hadoop-common" % "3.3.5" % "provided",
  "org.apache.hadoop" % "hadoop-aws" % "3.3.5" % "provided",
  "com.amazonaws" % "aws-java-sdk-bundle" % "1.12.463" % "provided",
  "za.co.absa" %% "abris" % "6.3.0"
)
ThisBuild / assemblyMergeStrategy := {
  // https://stackoverflow.com/a/67937671/2000548
  case PathList("module-info.class") => MergeStrategy.discard
  case x if x.endsWith("/module-info.class") => MergeStrategy.discard
  // https://stackoverflow.com/a/76129963/2000548
  case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
  case x =>
    val oldStrategy = (ThisBuild / assemblyMergeStrategy).value
    oldStrategy(x)
}

1.2 [Succeed] Generating "standard" / "vanilla" Apache Avro data in Spark

First, I generated my Varo schema by

curl --location 'http://apicurio-registry.svc:8080/apis/registry/v2/groups/default/artifacts' \
--header 'Content-type: application/json; artifactType=AVRO' \
--header 'X-Registry-ArtifactId: hm.motor-value' \
--data '{
    "type": "record",
    "namespace": "com.hongbomiao",
    "name": "motor",
    "fields": [
        {
            "name": "timestamp",
            "type": "long"
        },
        {
            "name": "current",
            "type": "double"
        },
        {
            "name": "voltage",
            "type": "double"
        },
        {
            "name": "temperature",
            "type": "double"
        }
    ]
}'

enter image description here

In Spark, it is very straightforward to use it with native org.apache.spark.sql.avro.functions.to_avro.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, struct}
import org.apache.spark.sql.types.{DoubleType, LongType, StructType}
import org.apache.spark.sql.avro.functions.to_avro
import sttp.client3.{HttpClientSyncBackend, UriContext, basicRequest}

object IngestFromS3ToKafka {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("ingest-from-s3-to-kafka")
      .config("spark.ui.port", "4040")
      .getOrCreate()

    val folderPath = "s3a://hongbomiao-bucket/iot/"

    // For below `parquet_schema`, you can
    //  1. hard code like current code
    //  2. read from one file `val parquet_schema = spark.read.parquet("s3a://hongbomiao-bucket/iot/motor.parquet").schema`
    //  3. Maybe possible also from Avro, I will try in future!
    val parquetSchema = new StructType()
      .add("timestamp", DoubleType)
      .add("current", DoubleType, nullable = true)
      .add("voltage", DoubleType, nullable = true)
      .add("temperature", DoubleType, nullable = true)

    val backend = HttpClientSyncBackend()
    val response = basicRequest
      .get(
        uri"http://apicurio-registry.svc:8080/apis/registry/v2/groups/hm-group/artifacts/hm.motor-value"
      )
      .send(backend)
    val kafkaRecordValueSchema = response.body.fold(identity, identity)

    val df = spark.readStream
      .schema(parquetSchema)
      .option("maxFilesPerTrigger", 1)
      .parquet(folderPath)
      .withColumn("timestamp", (col("timestamp") * 1000).cast(LongType))
      .select(to_avro(struct("*"), kafkaRecordValueSchema).alias("value"))

    val query = df.writeStream
      .format("kafka")
      .option(
        "kafka.bootstrap.servers",
        "hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
      )
      .option("topic", "hm.motor")
      .option("checkpointLocation", "/tmp/checkpoint")
      .start()

    query.awaitTermination()
  }
}

built.sbt

name := "IngestFromS3ToKafka"
version := "1.0"
scalaVersion := "2.12.17"
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "3.3.2" % "provided",
  "org.apache.spark" %% "spark-sql" % "3.3.2" % "provided",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.3.2" % "provided",
  "org.apache.spark" %% "spark-avro" % "3.3.2" % "provided",
  "org.apache.hadoop" % "hadoop-common" % "3.3.5" % "provided",
  "org.apache.hadoop" % "hadoop-aws" % "3.3.5" % "provided",
  "com.amazonaws" % "aws-java-sdk-bundle" % "1.12.461" % "provided",
  "com.softwaremill.sttp.client3" %% "core" % "3.8.15"
)

I got many ideas from this article.

2. Reading Avro format Kafka message in JDBC Kafka Connector and sink to database

2.1 Kafka message in Confluent Avro

[Succeed] 2.1.1 Using io.confluent.connect.avro.AvroConverter with Confluent Registry

Here we use Confluent Registry REST API:

{
    "name": "hm-motor-jdbc-sink-kafka-connector",
    "config": {
        "connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
        "tasks.max": 1,
        "topics": "hm.motor",
        "connection.url": "jdbc:postgresql://timescale.hm-timescale.svc:5432/hm_iot_db",
        "connection.user": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_user}",
        "connection.password": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_password}",
        "insert.mode": "upsert",
        "table.name.format": "motor",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://confluent-schema-registry.svc:8081",
        "transforms": "convertTimestamp",
        "transforms.convertTimestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.convertTimestamp.field": "timestamp",
        "transforms.convertTimestamp.target.type": "Timestamp"
    }
}

2.1.2 Using io.confluent.connect.avro.AvroConverter with Apicurio Schema Registry

Here we use Apicurio Registry's Confluent compatible REST API:

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://apicurio-registry.svc:8080/apis/ccompat/v6",

(I didn't further test this direction)

2.1.3 Using io.apicurio.registry.utils.converter.AvroConverter with Apicurio Schema Registry

Here we use Apicurio Registry's Confluent compatible REST API:

"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://apicurio-registry.svc:8080/apis/ccompat/v6",
"value.converter.apicurio.registry.as-confluent": true,

(I didn't further test this direction)

2.3 [Succeed] Kafka message in "vanilla" Apache Avro

Here we use io.apicurio.registry.utils.converter.AvroConverter.

My JDBC connector config:

{
    "name": "hm-motor-jdbc-sink-kafka-connector",
    "config": {
        "connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
        "tasks.max": 1,
        "topics": "hm.motor",
        "connection.url": "jdbc:postgresql://timescale.hm-timescale.svc:5432/hm_iot_db",
        "connection.user": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_user}",
        "connection.password": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_password}",
        "insert.mode": "upsert",
        "table.name.format": "motor",
        "transforms": "convertTimestamp",
        "transforms.convertTimestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.convertTimestamp.field": "timestamp",
        "transforms.convertTimestamp.target.type": "Timestamp",

        "value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
        "value.converter.apicurio.registry.url": "http://apicurio-registry.svc:8080/apis/registry/v2"
        "value.converter.apicurio.registry.fallback.group-id": "hm-group",
        "value.converter.apicurio.registry.fallback.artifact-id": "hm.motor-value"
    }
}

Maybe in future I can figure out the way to get rid of value.converter.apicurio.registry.fallback related fields.

More info about io.apicurio.registry.utils.converter.AvroConverter can be found at here.

Hongbo Miao
  • 45,290
  • 60
  • 174
  • 267