I have kafka topic with simple avro serialized data in it and I am trying to read this data in my spark app which is on scala. When I print spark Dataframe to console, I can see that there are issues with desterilizing (or smth else) because my output looks like this:
+----+---------------+--------------+---+
|name|favorite_number|favorite_color|age|
+----+---------------+--------------+---+
| | 0| | 0|
But the data that I have in kafka is: (I am viewing this data from console with this command: kafka-avro-console-consumer --topic test-py-topic --bootstrap-server localhost:9092 --from-beginning)
{"name":"Jane1","favorite_number":15,"favorite_color":"black","age":35} {"name":"Jane1","favorite_number":15,"favorite_color":"black","age":35}
Here is a scala code that I use:
val spark = SparkSession
.builder
.appName("Stream Handler")
.config("spark.master", "local")
.config("spark.cassandra.connection.host", "localhost")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
val inputDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test-2")
.option("startingOffsets", "earliest")
.load()
val jsonFormatSchema = new String(
Files.readAllBytes(Paths.get("/src/resources/shemas/schema.avsc")))
val personDF = inputDF.select(from_avro(col("value"), jsonFormatSchema).as("person"))
.select("person.*")
personDF.printSchema()
val query = personDF.writeStream.outputMode("update").format("console").start()
query.awaitTermination()
Also, here is other output from Spark logs, which seems to be correct:
root
|-- name: string (nullable = true)
|-- favorite_number: integer (nullable = true)
|-- favorite_color: string (nullable = true)
|-- age: integer (nullable = true)
My build.sbt file, so you can figure out which versions I am using:
name := "spark_scala"
version := "0.1"
scalaVersion := "2.12.10"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.1.2"
libraryDependencies += "io.spray" %% "spray-json" % "1.3.6"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.1.2"
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.1.2"
libraryDependencies += "org.apache.spark" %% "spark-avro" % "3.1.2"
libraryDependencies ++= Seq(
"com.datastax.spark" %% "spark-cassandra-connector" % "3.1.0",
"com.datastax.cassandra" % "cassandra-driver-core" % "3.11.0")
libraryDependencies += "joda-time" % "joda-time" % "2.10.13"
My json-avro schema:
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "favorite_number",
"type": "int"
},
{
"name": "favorite_color",
"type": "string"
},
{
"name": "age",
"type": "int",
"default": 18
}
]
}
I am uploading data to kafka topic with simple python code:
self.producer = AvroProducer(
self.config,
default_key_schema=key_schema,
default_value_schema=value_schema
)
self.producer.produce(topic=self.topic, key=key, value=person.to_json())
where key is a simple int, and value is class with a bunch of params. Also, I am not using "pure" kafka, I am using confluent. I hope I didn't forget anything, Thanks in advance!