0

I have kafka topic with simple avro serialized data in it and I am trying to read this data in my spark app which is on scala. When I print spark Dataframe to console, I can see that there are issues with desterilizing (or smth else) because my output looks like this:

+----+---------------+--------------+---+
|name|favorite_number|favorite_color|age|
+----+---------------+--------------+---+
|    |              0|              |  0|

But the data that I have in kafka is: (I am viewing this data from console with this command: kafka-avro-console-consumer --topic test-py-topic --bootstrap-server localhost:9092 --from-beginning)

{"name":"Jane1","favorite_number":15,"favorite_color":"black","age":35} {"name":"Jane1","favorite_number":15,"favorite_color":"black","age":35}

Here is a scala code that I use:

val spark = SparkSession
      .builder
      .appName("Stream Handler")
      .config("spark.master", "local")
      .config("spark.cassandra.connection.host", "localhost")
      .getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
    import spark.implicits._
    val inputDF = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "test-2")
      .option("startingOffsets", "earliest")
      .load()
    val jsonFormatSchema = new String(
      Files.readAllBytes(Paths.get("/src/resources/shemas/schema.avsc")))
    val personDF = inputDF.select(from_avro(col("value"), jsonFormatSchema).as("person"))
      .select("person.*")
    personDF.printSchema()
    val query = personDF.writeStream.outputMode("update").format("console").start()
    query.awaitTermination()

Also, here is other output from Spark logs, which seems to be correct:

root
 |-- name: string (nullable = true)
 |-- favorite_number: integer (nullable = true)
 |-- favorite_color: string (nullable = true)
 |-- age: integer (nullable = true)

My build.sbt file, so you can figure out which versions I am using:

name := "spark_scala"

version := "0.1"

scalaVersion := "2.12.10"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.1.2"

libraryDependencies += "io.spray" %%  "spray-json" % "1.3.6"

libraryDependencies += "org.apache.spark" %% "spark-core" % "3.1.2"

libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.1.2"

libraryDependencies += "org.apache.spark" %% "spark-avro" % "3.1.2"

libraryDependencies ++= Seq(
  "com.datastax.spark" %% "spark-cassandra-connector" % "3.1.0",
  "com.datastax.cassandra" % "cassandra-driver-core" % "3.11.0")

libraryDependencies += "joda-time" % "joda-time" % "2.10.13"

My json-avro schema:

{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "favorite_number",
      "type": "int"
    },
    {
      "name": "favorite_color",
      "type": "string"
    },
    {
      "name": "age",
      "type": "int",
      "default": 18
    }
  ]
}

I am uploading data to kafka topic with simple python code:

self.producer = AvroProducer(
            self.config,
            default_key_schema=key_schema,
            default_value_schema=value_schema
        )
self.producer.produce(topic=self.topic, key=key, value=person.to_json())

where key is a simple int, and value is class with a bunch of params. Also, I am not using "pure" kafka, I am using confluent. I hope I didn't forget anything, Thanks in advance!

Illia
  • 45
  • 6
  • How are you producing the data? You said you have JSON records in the topic? That's not Avro... Also `from_avro` cannot deserialize Confluent Schema Registry data. Is this what you're using? – OneCricketeer Nov 06 '21 at 20:14
  • sorry, forgot to mention this. Check my updated post. Long story short: I believe I have avro format in kafka because I am using AvroProducer (Python class from `from confluent_kafka.avro import AvroProducer`). It looks like JSON because I am getting this output into console, using `kafka-avro-console-consumer --topic test-py-topic --bootstrap-server localhost:9092 --from-beginning` thanks! – Illia Nov 06 '21 at 21:20
  • Okay, then yeah, that's using Schema Registry and you need a different way to read the data. Refer linked post for multiple solutions – OneCricketeer Nov 06 '21 at 22:26

0 Answers0