31

I'm using a Kafka Source in Spark Structured Streaming to receive Confluent encoded Avro records. I intend to use Confluent Schema Registry, but the integration with spark structured streaming seems to be impossible.

I have seen this question, but unable to get it working with the Confluent Schema Registry. Reading Avro messages from Kafka with Spark 2.0.2 (structured streaming)

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Souhaib Guitouni
  • 860
  • 1
  • 7
  • 24
  • 3
    Thanks @VinothChinnasamy but your link is about classic spark streaming, I'm talking about spark STRUCTURED streaming – Souhaib Guitouni Feb 20 '18 at 12:39
  • you need to respect kafka spark integration : https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html – G.Saleh Feb 28 '18 at 10:44
  • 3
    @G.Saleh thank you but you misunderstand the question. – Souhaib Guitouni Mar 07 '18 at 17:46
  • 1
    Please upvote the confluence issue about it : https://github.com/confluentinc/schema-registry/issues/755 – Souhaib Guitouni Mar 07 '18 at 17:46
  • Possible duplicate of [reading Avro messages from Kafka with Spark 2.0.2 (structured streaming)](https://stackoverflow.com/questions/40705926/reading-avro-messages-from-kafka-with-spark-2-0-2-structured-streaming) – OneCricketeer Mar 13 '18 at 19:14
  • @cricket_007 no it's not the same thing – Souhaib Guitouni Mar 13 '18 at 19:23
  • Why not? See `schemaRegistryUrl` in this answer? https://stackoverflow.com/a/41146808/2308683 – OneCricketeer Mar 13 '18 at 19:28
  • That question is not about the schema registry. The answer for it was not clear, but with your edits it's much clearer, thank you! – Souhaib Guitouni Mar 14 '18 at 09:26
  • Check out this project - https://github.com/hortonworks-spark/spark-schema-registry This allows you to integrate Hortonwork's Schema registry (https://github.com/hortonworks/registry) with Spark. It may also be possible to plug this into Confluent Schema registry (since the Hortonworks Schema registry is compatible with the Confluent one), but you will need to explore it further. – arunmahadevan Feb 07 '19 at 22:34
  • Check out this project - https://github.com/hortonworks-spark/spark-schema-registry This allows you to integrate Hortonwork's Schema registry (https://github.com/hortonworks/registry) with Spark. It may also be possible to plug this into Confluent Schema registry (since the Hortonworks Schema registry is compatible with the Confluent one), but you will need to explore it further. – arunmahadevan Feb 07 '19 at 22:34
  • I completely agree with @SouhaibGuitouni This is an issue that should not be happening in Confluent. – Ivan Jun 03 '22 at 19:12

11 Answers11

23

It took me a couple months of reading source code and testing things out. In a nutshell, Spark can only handle String and Binary serialization. You must manually deserialize the data. In spark, create the confluent rest service object to get the schema. Convert the schema string in the response object into an Avro schema using the Avro parser. Next, read the Kafka topic as normal. Then map over the binary typed "value" column with the Confluent KafkaAvroDeSerializer. I strongly suggest getting into the source code for these classes because there is a lot going on here, so for brevity I'll leave out many details.

//Used Confluent version 3.2.2 to write this. 
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema

case class DeserializedFromKafkaRecord(key: String, value: String)

val schemaRegistryURL = "http://127.0.0.1:8081"

val topicName = "Schema-Registry-Example-topic1"
val subjectValueName = topicName + "-value"

//create RestService object
val restService = new RestService(schemaRegistryURL)

//.getLatestVersion returns io.confluent.kafka.schemaregistry.client.rest.entities.Schema object.
val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)

//Use Avro parsing classes to get Avro Schema
val parser = new Schema.Parser
val topicValueAvroSchema: Schema = parser.parse(valueRestResponseSchema.getSchema)

//key schema is typically just string but you can do the same process for the key as the value
val keySchemaString = "\"string\""
val keySchema = parser.parse(keySchemaString)

//Create a map with the Schema registry url.
//This is the only Required configuration for Confluent's KafkaAvroDeserializer.
val props = Map("schema.registry.url" -> schemaRegistryURL)

//Declare SerDe vars before using Spark structured streaming map. Avoids non serializable class exception.
var keyDeserializer: KafkaAvroDeserializer = null
var valueDeserializer: KafkaAvroDeserializer = null

//Create structured streaming DF to read from the topic.
val rawTopicMessageDF = sql.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "127.0.0.1:9092")
  .option("subscribe", topicName)
  .option("startingOffsets", "earliest")
  .option("maxOffsetsPerTrigger", 20)  //remove for prod
  .load()

//instantiate the SerDe classes if not already, then deserialize!
val deserializedTopicMessageDS = rawTopicMessageDF.map{
  row =>
    if (keyDeserializer == null) {
      keyDeserializer = new KafkaAvroDeserializer
      keyDeserializer.configure(props.asJava, true)  //isKey = true
    }
    if (valueDeserializer == null) {
      valueDeserializer = new KafkaAvroDeserializer
      valueDeserializer.configure(props.asJava, false) //isKey = false
    }

    //Pass the Avro schema.
    val deserializedKeyString = keyDeserializer.deserialize(topicName, row.key, keySchema).toString //topic name is actually unused in the source code, just required by the signature. Weird right?
    val deserializedValueString = valueDeserializer.deserialize(topicName, row.value, topicValueAvroSchema).toString

    DeserializedFromKafkaRecord(deserializedKeyString, deserializedValueString)
}

val deserializedDSOutputStream = deserializedTopicMessageDS.writeStream
    .outputMode("append")
    .format("console")
    .option("truncate", false)
    .start()
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
tstites
  • 284
  • 1
  • 2
  • 7
  • Can you elaborate on the comment `topic name is actually unused in the source code, just required by the signature. Weird right?` – Jeremy Mar 09 '18 at 16:49
  • 2
    It seems the signature for the deserialize method calls for a string, but it is unused in the function body. [KafkaAvroDeserializer.java](https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDeserializer.java) – tstites Mar 09 '18 at 18:27
  • Hi, I am trying to implement the same code. I am getting an exception at keyDeserializer.deserialize(topicName, row.key, keySchema).toString , saying keySchema is org.apache.avro.Schema where as required is Array[Byte]. Checked the source code it looks like it expects Array[Byte] https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDeserializer.java. Something i am missing here ? – Vibhor Nigam Mar 08 '19 at 18:55
  • @tstites, im not able to find io.confluent.kafka.schemaregistry.client.rest.RestService this package in any confluent repositories, can you give location of this jar or mvn repository for this package? – Karthikeyan Jun 14 '19 at 12:12
  • 2
    @Karthikeyan https://github.com/confluentinc/schema-registry/blob/master/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/RestService.java is part of `io.confluent:kafka-schema-registry-client` And the repo is here https://docs.confluent.io/current/clients/install.html#installation-maven – OneCricketeer Jul 26 '19 at 17:05
  • @VibhorNigam, getting the same row.value error as you , did you fix it – scalacode Jun 23 '20 at 10:24
  • @scalacode unfortunately no. i got another source with same data so moved on from this issue. – Vibhor Nigam Jun 24 '20 at 11:14
11

Disclaimer

This code was only tested on a local master, and has been reported runs into serializer issues in a clustered environment. There's an alternative solution (step 7-9, with Scala code in step 10) that extracts out the schema ids to columns, looks up each unique ID, and then uses schema broadcast variables, which will work better, at scale.

Also, there is an external library AbsaOSS/ABRiS that also addresses using the Registry with Spark


Since the other answer that was mostly useful was removed, I wanted to re-add it with some refactoring and comments.

Here are the dependencies needed. Code tested with Confluent 5.x and Spark 2.4

     <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${confluent.version}</version>
            <exclusions> 
                <!-- Conflicts with Spark's version -->
                <exclusion> 
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
     </dependency>
 
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-avro_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>

And here is the Scala implementation (only tested locally on master=local[*])

First section, define the imports, some fields, and a few helper methods to get schemas

import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.commons.cli.CommandLine
import org.apache.spark.sql._
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.streaming.OutputMode

object App {

  private var schemaRegistryClient: SchemaRegistryClient = _

  private var kafkaAvroDeserializer: AvroDeserializer = _

  def lookupTopicSchema(topic: String, isKey: Boolean = false) = {
    schemaRegistryClient.getLatestSchemaMetadata(topic + (if (isKey) "-key" else "-value")).getSchema
  }

  def avroSchemaToSparkSchema(avroSchema: String) = {
    SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))
  }

 // ... continues below

Then define a simple main method that parses the CMD args to get Kafka details

  def main(args: Array[String]): Unit = {
    val cmd: CommandLine = parseArg(args)

    val master = cmd.getOptionValue("master", "local[*]")
    val spark = SparkSession.builder()
      .appName(App.getClass.getName)
      .master(master)
      .getOrCreate()

    val bootstrapServers = cmd.getOptionValue("bootstrap-server")
    val topic = cmd.getOptionValue("topic")
    val schemaRegistryUrl = cmd.getOptionValue("schema-registry")

    consumeAvro(spark, bootstrapServers, topic, schemaRegistryUrl)

    spark.stop()
  }


  // ... still continues

Then, the important method that consumes the Kafka topic and deserializes it

  private def consumeAvro(spark: SparkSession, bootstrapServers: String, topic: String, schemaRegistryUrl: String): Unit = {
    import spark.implicits._

    // Setup the Avro deserialization UDF
    schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
    kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient) 
    spark.udf.register("deserialize", (bytes: Array[Byte]) =>
      kafkaAvroDeserializer.deserialize(bytes)
    )

    // Load the raw Kafka topic (byte stream)
    val rawDf = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topic)
      .option("startingOffsets", "earliest")
      .load()

    // Deserialize byte stream into strings (Avro fields become JSON)
    import org.apache.spark.sql.functions._
    val jsonDf = rawDf.select(
      // 'key.cast(DataTypes.StringType),  // string keys are simplest to use
      callUDF("deserialize", 'key).as("key"), // but sometimes they are avro
      callUDF("deserialize", 'value).as("value")
      // excluding topic, partition, offset, timestamp, etc
    )

    // Get the Avro schema for the topic from the Schema Registry and convert it into a Spark schema type
    val dfValueSchema = {
      val rawSchema = lookupTopicSchema(topic)
      avroSchemaToSparkSchema(rawSchema)
    }

    // Apply structured schema to JSON stream
    val parsedDf = jsonDf.select(
      'key, // keys are usually plain strings
      // values are JSONified Avro records
      from_json('value, dfValueSchema.dataType).alias("value")
    ).select(
      'key,
      $"value.*" // flatten out the value
    )

    // parsedDf.printSchema()

    // Sample schema output
    // root
    // |-- key: string (nullable = true)
    // |-- header: struct (nullable = true)   // Not a Kafka record "header". This is part of our value schema
    // |    |-- time: long (nullable = true)
    // |    ...

    // TODO: Do something interesting with this stream
    parsedDf.writeStream
      .format("console")
      .outputMode(OutputMode.Append())
      .option("truncate", false)
      .start()
      .awaitTermination()
  }

 // still continues

The command line parser allows for passing in bootstrap servers, schema registry, topic name, and Spark master.

  private def parseArg(args: Array[String]): CommandLine = {
    import org.apache.commons.cli._

    val options = new Options

    val masterOption = new Option("m", "master", true, "Spark master")
    masterOption.setRequired(false)
    options.addOption(masterOption)

    val bootstrapOption = new Option("b", "bootstrap-server", true, "Bootstrap servers")
    bootstrapOption.setRequired(true)
    options.addOption(bootstrapOption)

    val topicOption = new Option("t", "topic", true, "Kafka topic")
    topicOption.setRequired(true)
    options.addOption(topicOption)

    val schemaRegOption = new Option("s", "schema-registry", true, "Schema Registry URL")
    schemaRegOption.setRequired(true)
    options.addOption(schemaRegOption)

    val parser = new BasicParser
    parser.parse(options, args)
  }

  // still continues

In order for the UDF above to work, then there needed to be a deserializer to take the DataFrame of bytes to one containing deserialized Avro

  // Simple wrapper around Confluent deserializer
  class AvroDeserializer extends AbstractKafkaAvroDeserializer {
    def this(client: SchemaRegistryClient) {
      this()
      // TODO: configure the deserializer for authentication 
      this.schemaRegistry = client
    }

    override def deserialize(bytes: Array[Byte]): String = {
      val value = super.deserialize(bytes)
      value match {
        case str: String =>
          str
        case _ =>
          val genericRecord = value.asInstanceOf[GenericRecord]
          genericRecord.toString
      }
    }
  }

} // end 'object App'

Put each of these blocks together, and it works in IntelliJ after adding -b localhost:9092 -s http://localhost:8081 -t myTopic to Run Configurations > Program Arguments

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • 2
    It's not working in standlone cluster mode..throws Failed to execute user defined function(anonfun$consumeAvro$1: (binary) => string) – Learnis Jun 20 '20 at 17:41
  • yeah @OneCricketeer. It's not working in yarn as well. Getting a null exception while deserializing line kafkaAvroDeserializer.deserialize(bytes). I try to handle with Try{}.getOrelse(), but if I do that I'm not getting data in data frame. Is it solvable ? any workaround for this ? – Learnis Jun 21 '20 at 03:37
  • 1
    Or any working solutions in this stackoverflow post ? for cluster mode – Learnis Jun 21 '20 at 03:45
  • @Learnis If you get an NPE in deserialize, then the bytes are probably null. I believe the deserializer is correctly initialized in the closure – OneCricketeer Jun 22 '20 at 16:24
  • Hi @OneCricketeer, why the key and value has a single quote in front of it? also, I had to add extra dependency ``` "org.apache.spark" %% "spark-sql"``` (sbt syntax), for the compiler to be happy for SparkSession etc. and then it has below error ```type mismatch; [error] found : Symbol [error] required: org.apache.spark.sql.Column [error] callUDF("deserialize", 'key).as("key"), // but sometimes they are avro ``` – soMuchToLearnAndShare Sep 10 '20 at 21:10
  • 1
    @Minnie The single quote creates a [Symbol object](https://stackoverflow.com/questions/918590/what-does-a-single-apostrophe-mean-in-scala). The way to get a column would be `Column("key")` or `$"key"`, but that was more typing – OneCricketeer Sep 10 '20 at 21:46
  • Hi @OneCricketeer, when run in inteliJ, it has exception ```Exception in thread "main" java.net.ConnectException: Connection refused (Connection refused) ... ... at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:272)```. basically i could not connect to the SchemaRegistry. Note Kafka Confluent 5.5.1 is running. I tried this command ```nc -vz localhost 8081``` to verify, and yes, it has error ```nc: connect to localhost port 8081 (tcp) failed: Connection refused```. Any suggestions on this? Thanks a lot. – soMuchToLearnAndShare Sep 11 '20 at 08:15
  • 1
    @Minnie Sound like you need to start a Schema Registry? – OneCricketeer Sep 11 '20 at 16:37
10

This is an example of my code integrating spark structured streaming with kafka and schema registry (code in scala)

import org.apache.spark.sql.SparkSession
import io.confluent.kafka.schemaregistry.client.rest.RestService // <artifactId>kafka-schema-registry</artifactId>
import org.apache.spark.sql.avro.from_avro // <artifactId>spark-avro_${scala.compat.version}</artifactId>
import org.apache.spark.sql.functions.col

object KafkaConsumerAvro {

  def main(args: Array[String]): Unit = {

    val KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"
    val SCHEMA_REGISTRY_URL = "http://localhost:8081"
    val TOPIC = "transactions"

    val spark: SparkSession = SparkSession.builder().appName("KafkaConsumerAvro").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")

    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
      .option("subscribe", TOPIC)
      .option("startingOffsets", "earliest") // from starting
      .load()

//     Prints Kafka schema with columns (topic, offset, partition e.t.c)
    df.printSchema()

//    Create REST service to access schema registry and retrieve topic schema (latest)
    val restService = new RestService(SCHEMA_REGISTRY_URL)
    val valueRestResponseSchema = restService.getLatestVersion(TOPIC + "-value")
    val jsonSchema = valueRestResponseSchema.getSchema

    val transactionDF = df.select(
      col("key").cast("string"), // cast to string from binary value
      from_avro(col("value"), jsonSchema).as("transaction"), // convert from avro value
      col("topic"),
      col("offset"),
      col("timestamp"),
      col("timestampType"))
    transactionDF.printSchema()

//    Stream data to console for testing
    transactionDF.writeStream
      .format("console")
      .outputMode("append")
      .start()
      .awaitTermination()
  }

}

When reading from kafka topic, we have this kind of schema:

key: binary | value: binary | topic: string | partition: integer | offset: long | timestamp: timestamp | timestampType: integer |

As we can see, key and value are binary so we need to cast key as string and in this case, value is avro formatted so we can achieve this by calling from_avro function.

In adition to Spark and Kafka dependencies, we need this dependencies:

<!-- READ AND WRITE AVRO DATA -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-avro_${scala.compat.version}</artifactId>
  <version>${spark.version}</version>
</dependency>
<!-- INTEGRATION WITH SCHEMA REGISTRY -->
<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-schema-registry</artifactId>
  <version>${confluent.version}</version>
</dependency>
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
davidretana
  • 171
  • 2
  • 4
  • Could you please explain how we can pass schema registry credentials in you program in case needed? – Leibnitz Nov 23 '19 at 06:40
  • I don't need to authenticate against schema registry, but i've found this information: (https://docs.confluent.io/current/schema-registry/security/index.html), and in this link you can configure Schema Registry authorization for communicating with the RBAC Kafka cluster. (https://docs.confluent.io/current/schema-registry/security/rbac-schema-registry.html) – davidretana Nov 25 '19 at 07:08
  • 1
    To pass schema registry credentials, see this answer: https://stackoverflow.com/a/58930199/6002794 – Antón R. Yuste Mar 26 '20 at 17:00
  • Will this work in standalone cluster or yarn mode ? – Learnis Jun 21 '20 at 03:40
10

Another very simple alternative for pyspark (without full support for schema registry like schema registration, compatibility check, etc.) could be:

import requests

from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.avro.functions import *

# variables
topic = "my-topic"
schemaregistry = "http://localhost:8081"
kafka_brokers = "kafka1:9092,kafka2:9092"

# retrieve the latest schema
response = requests.get('{}/subjects/{}-value/versions/latest/schema'.format(schemaregistry, topic))

# error check
response.raise_for_status()

# extract the schema from the response
schema = response.text

# run the query
query = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", kafka_brokers) \
    .option("subscribe", topic) \
    .load() \
    # The magic goes here:
    # Skip the first 5 bytes (reserved by schema registry encoding protocol)
    .selectExpr("substring(value, 6) as avro_value") \
    .select(from_avro(col("avro_value"), schema).alias("data")) \
    .select(col("data.my_field")) \
    .writeStream \
    .format("console") \
    .outputMode("complete") \
    .start()
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
dudssource
  • 109
  • 1
  • 3
8

This library will do the job for you. It connects to Confluent Schema Registry through Spark Structured Stream.

For Confluent, it copes with the schema id that is sent along with the payload.

In the README you will find a code snippet of how to do it.

DISCLOSURE: I work for ABSA and I developed this library.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Felipe Martins Melo
  • 1,323
  • 11
  • 15
  • description in this lib seems not correct for example in decripton there is 2.0.0 version but in maven i saw only 1.0.0 – Mikhail Jul 26 '18 at 14:26
  • also i can not build the project. i have an error: [ERROR] E:\projects\dvsts\ABRiS\src\test\scala\za\co\absa\abris\avro\read\confluent\ScalaConfluentKafkaAvroDeserializerSpec.scala:113: error: class MockedSchemaRegistryClient needs to be abstract, since: [ERROR] it has 8 unimplemented members. – Mikhail Jul 26 '18 at 17:14
  • @Mikhail, the new version was updated yesterday, and probably when you checked Maven Central it had not yet been synchronized. You can find it here: https://mvnrepository.com/artifact/za.co.absa/abris/2.0.0 – Felipe Martins Melo Jul 27 '18 at 17:02
  • Would be nice to see an example usage here on this answer – OneCricketeer Dec 15 '18 at 19:12
  • @cricket_007, does this library work with spark Java api, as I cannot able to get `fromavro` method after all the imports. could you please ? – Pyd May 03 '19 at 07:51
  • @Vignesh I have not used it. But yes, any Scala library can be imported into Java code because it's all still running in the same JVM. – OneCricketeer May 03 '19 at 18:33
6

Databricks now provide this functionality but you have to pay for it :-(

dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .save()

See: https://docs.databricks.com/spark/latest/structured-streaming/avro-dataframe.html for more info

A good free alternative is ABRIS. See: https://github.com/AbsaOSS/ABRiS the only downside we can see that you need to provide a file of your avro schema at runtime so the framework can enforce this schema on your dataframe before it publishes it to the Kafka topic.

randal25
  • 1,290
  • 13
  • 10
  • 2
    Only Databricks supports the registry, not Apache Spark itself – OneCricketeer May 03 '19 at 18:32
  • Does Databricks support Schema Registry of Confluent? Or another type of schema registry. In case you can use Databricks, someone knows how to pass the schema registry credentials.I say this because the examples I find do not comment on it. – xav Nov 14 '19 at 15:44
  • @xav Yes. Databricks partners with Confluent to support that Avro + Schema Registry functionality – OneCricketeer Jun 20 '20 at 22:07
5

Based on @cricket_007's answers I created the following solution which could run in our cluster environment, including the following new features:

  • You need add broadcast variables to transfer some values into map operations for cluster environment. Neither Schema.Parser nor KafkaAvroDeserializer could be serialized in spark, so it is why you need initialize them in map operations
  • My structured streaming used foreachBatch output sink.
  • I applied org.apache.spark.sql.avro.SchemaConverters to convert avro schema format to spark StructType, so that you could use it in from_json column function to parse dataframe in Kafka topic fields (key and value).

Firstly, you need load some packages:

SCALA_VERSION="2.11"
SPARK_VERSION="2.4.4"
CONFLUENT_VERSION="5.2.2"

jars=(
  "org.apache.spark:spark-sql-kafka-0-10_${SCALA_VERSION}:${SPARK_VERSION}"    ## format("kafka")
  "org.apache.spark:spark-avro_${SCALA_VERSION}:${SPARK_VERSION}"    ## SchemaConverters
  "io.confluent:kafka-schema-registry:${CONFLUENT_VERSION}"   ## import io.confluent.kafka.schemaregistry.client.rest.RestService
  "io.confluent:kafka-avro-serializer:${CONFLUENT_VERSION}"   ## import io.confluent.kafka.serializers.KafkaAvroDeserializer
)

./bin/spark-shell --packages ${"${jars[*]}"// /,}

Here are the whole codes I tested in spark-shell:

import org.apache.avro.Schema
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.schemaregistry.client.rest.RestService

import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.avro.SchemaConverters

import scala.collection.JavaConverters._
import java.time.LocalDateTime

spark.sparkContext.setLogLevel("Error")

val brokerServers = "xxx.yyy.zzz:9092"
val topicName = "mytopic" 
val schemaRegistryURL = "http://xxx.yyy.zzz:8081"

val restService = new RestService(schemaRegistryURL)

val exParser = new Schema.Parser
//-- For both key and value
val schemaNames = Seq("key", "value")
val schemaStrings = schemaNames.map(i => (i -> restService.getLatestVersion(s"$topicName-$i").getSchema)).toMap
val tempStructMap = schemaStrings.transform((k,v) => SchemaConverters.toSqlType(exParser.parse(v)).dataType)
val schemaStruct = new StructType().add("key", tempStructMap("key")).add("value", tempStructMap("value"))
//-- For key only 
// val schemaStrings = restService.getLatestVersion(s"$topicName-key").getSchema
// val schemaStruct = SchemaConverters.toSqlType(exParser.parse(schemaStrings)).dataType
//-- For value only 
// val schemaStrings = restService.getLatestVersion(s"$topicName-value").getSchema
// val schemaStruct = SchemaConverters.toSqlType(exParser.parse(schemaStrings)).dataType


val query = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokerServers)
  .option("subscribe", topicName)
  .load()
  .writeStream
  .outputMode("append")
  //.option("checkpointLocation", s"cos://$bucket.service/checkpoints/$tableName")
  .foreachBatch((batchDF: DataFrame, batchId: Long) => {

    val bcTopicName = sc.broadcast(topicName)
    val bcSchemaRegistryURL = sc.broadcast(schemaRegistryURL)
    val bcSchemaStrings = sc.broadcast(schemaStrings)
    
    val rstDF = batchDF.map {
      row =>
      
        val props = Map("schema.registry.url" -> bcSchemaRegistryURL.value)
        //-- For both key and value
        val isKeys =  Map("key" -> true, "value" -> false)
        val deserializers = isKeys.transform{ (k,v) => 
            val des = new KafkaAvroDeserializer
            des.configure(props.asJava, v)
            des 
        }
        //-- For key only 
        // val deserializer = new KafkaAvroDeserializer
        // deserializer.configure(props.asJava, true)
        //-- For value only 
        // val deserializer = new KafkaAvroDeserializer
        // deserializer.configure(props.asJava, false)
        

        val inParser = new Schema.Parser
        //-- For both key and value
        val values = bcSchemaStrings.value.transform( (k,v) => 
            deserializers(k).deserialize(bcTopicName.value, row.getAs[Array[Byte]](k), inParser.parse(v)).toString)
        s"""{"key": ${values("key")}, "value": ${values("value")} }"""
        //-- For key only 
        // deserializer.deserialize(bcTopicName.value, row.getAs[Array[Byte]]("key"), inParser.parse(bcSchemaStrings.value)).toString
        //-- For value only 
        // deserializer.deserialize(bcTopicName.value, row.getAs[Array[Byte]]("value"), inParser.parse(bcSchemaStrings.value)).toString  
      }
      .select(from_json(col("value"), schemaStruct).as("root"))
      .select("root.*")

    println(s"${LocalDateTime.now} --- Batch $batchId: ${rstDF.count} rows")
    rstDF.printSchema
    rstDF.show(false)    

  })
  .trigger(Trigger.ProcessingTime("60 seconds"))
  .start()

query.awaitTermination()
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
timothyzhang
  • 730
  • 9
  • 12
3

Summarizing some of answer above and adding some of my own experience, those are the options at the time of writing:

  1. 3rd party Abris library. This is what we used initially, but it doesn't seem to support a permissive mode where you can drop malformed packages. It will crash the stream when it encounters a malformed message. If you can guarantee the message validity, that is okay, but it was an issue for us as it kept trying to parse the malformed message after stream restart.
  2. Custom UDF which parses the Avro data, as outlined by OneCricketeer's answer. Gives the most flexibility but also requires the most custom code.
  3. Using Databrick's from_avro variant which allows you to simply pass the URL, and it will find the right schema and parse it for you. Works really well, but only available in their environment, thus hard to test in a codebase.
  4. Using Spark's built-in from_avro function. This functions allows you to pass a JSON schema and parse it from there. Only fix that you have to apply is that in Confluent's Wire format, there is one magic byte and 4 schema bytes before the actual Avro binary data starts as also pointed out in dudssource's answer. You can parse it like this in Scala:
val restService             = new RestService(espConfig.schemaRegistryUrl)
val valueRestResponseSchema = restService.getVersion(espConfig.fullTopicName + "-value", schemaVersion)
valueRestResponseSchema.getSchema

streamDf
  .withColumn("binary_data", substring(6, Int.MaxValue))
  .withColumn("parsed_data", from_avr('binary_data, jsonSchema, Map("MODE" -> "PERMISSIVE")))

RvdV
  • 406
  • 3
  • 10
2

For anyone that want's to do this in pyspark: The library that felipe referenced worked nicely on the JVM for me before, so i wrote a small wrapper function that integrates it in python. This looks very hacky, because a lot of types that are implicit in the scala language have to be specified explicitly in py4j. Has been working nicely so far, though, even in spark 2.4.1.

def expand_avro(spark_context, sql_context, data_frame, schema_registry_url, topic):
    j = spark_context._gateway.jvm
    dataframe_deserializer = j.za.co.absa.abris.avro.AvroSerDe.DataframeDeserializer(data_frame._jdf)
    naming_strategy = getattr(
        getattr(j.za.co.absa.abris.avro.read.confluent.SchemaManager,
                "SchemaStorageNamingStrategies$"), "MODULE$").TOPIC_NAME()
    conf = getattr(getattr(j.scala.collection.immutable.Map, "EmptyMap$"), "MODULE$")
    conf = getattr(conf, "$plus")(j.scala.Tuple2("schema.registry.url", schema_registry_url))
    conf = getattr(conf, "$plus")(j.scala.Tuple2("schema.registry.topic", topic))
    conf = getattr(conf, "$plus")(j.scala.Tuple2("value.schema.id", "latest"))
    conf = getattr(conf, "$plus")(j.scala.Tuple2("value.schema.naming.strategy", naming_strategy))
    schema_path = j.scala.Option.apply(None)
    conf = j.scala.Option.apply(conf)
    policy = getattr(j.za.co.absa.abris.avro.schemas.policy.SchemaRetentionPolicies, "RETAIN_SELECTED_COLUMN_ONLY$")()
    data_frame = dataframe_deserializer.fromConfluentAvro("value", schema_path, conf, policy)
    data_frame = DataFrame(data_frame, sql_context)
    return data_frame

For that to work, you have to add the library to the spark packages, e.g.

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages ' \
    'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1,' \
    'org.apache.spark:spark-avro_2.11:2.4.1,' \
    'za.co.absa:abris_2.11:2.2.2 ' \
    '--repositories https://packages.confluent.io/maven/ ' \
    'pyspark-shell'
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Falco Winkler
  • 1,082
  • 1
  • 18
  • 24
0

@RvdV Great summary. I was trying Abris library and consuming CDC record generated by Debezium.

val abrisConfig: FromAvroConfig = (AbrisConfig
  .fromConfluentAvro
  .downloadReaderSchemaByLatestVersion
  .andTopicNameStrategy(topicName)
  .usingSchemaRegistry(schemaRegistryURL))

val df=(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokerServers)
  .option("subscribe", topicName)
  .load())

val deserializedAvro = (df
  .select(from_avro(col("value"), abrisConfig)
          .as("data"))
  .select(col("data.after.*")))
deserializedAvro.printSchema()

val query = (deserializedAvro
  .writeStream
  .format("console")
  .outputMode("append")
  .option("checkpointLocation", s"s3://$bucketName/checkpoints/$tableName")
  .trigger(Trigger.ProcessingTime("60 seconds"))
  .start())

I added column while the streaming job is running. I was expecting it to print the new col that I added. It did not. Does it not dynamically refresh the schema from the version information in the payload ?

Akshaya
  • 21
  • 3
0

The problem is that if you use io.confluent.kafka.serializers.KafkaAvroSerializer when producing messages then the message bytes is not avro but [magic_byte schema_id (integer) avro_bytes] so from_avro does not work

You can see this here

https://github.com/confluentinc/schema-registry/blob/3e7eca9e0ce07c9167c301ccc7c1a2e8248c26a7/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java#LL135C17-L135C27

Dropping the magic byte and schema id like this works

select(from_avro(expr("substring(value, 6)"), schemaJson))

But its very inelegant

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Jelmer
  • 979
  • 6
  • 8