I reading from a Kafka Topic which has data in Avro format below is my code :
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "<IP>:9092")
.option("subscribe", "avrò-data")
.option("auto.offset.reset", "latest")
.option("checkpointLocation", "/tmp")
.load();
Dataset<Row> ds2 = df.select( from_avro(col("value"), schema).as("rows"),
col("timestamp"));
// Creating a DataFrame with Timestamp
ds2.createOrReplaceTempView("ds2");
Dataset<Row> ds3 = spark.sql("select rows.* , timestamp from ds2 ");
// Creating a Table
ds3.createOrReplaceTempView("table");
Dataset<Row> result = spark.sql("select * from table ; ");
StreamingQuery query = result
.writeStream()
.queryName("Test query")
.outputMode("complete")
.format("console")
.outputMode(OutputMode.Update())
.start();
I am getting the value as null for my I can see the data in Kafka side but in Spark all values are coming as zero or null below is my output:
+--------+-----+-------------+-----+----+--------------------+
|event_id|value| type|check|name| timestamp|
+--------+-----+-------------+-----+----+--------------------+
| | 0|2.6912392E-37|false| |2022-02-01 15:30:...|
| | 0|2.6912392E-37|false| |2022-02-01 15:30:...|
| | 0|2.6912392E-37|false| |2022-02-01 15:30:...|
| | 0|2.6912392E-37|false| |2022-02-01 15:30:...|
| | 0|2.6912392E-37|false| |2022-02-01 15:30:...|
+--------+-----+-------------+-----+----+--------------------+
Is I am doing anything wrong here ? Not able to figure out why value are coming empty ?
Below is Schema return for the schema registry:
String schema = "{\"type\":\"record\",\"name\":\"SparkEvent\",\"namespace\":\"com.Saprak\",\"fields\":[{\"name\":\"event_id\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"value\",\"type\":\"int\"},{\"name\":\"type\",\"type\":\"float\"},{\"name\":\"check\",\"type\":\"boolean\",\"default\":true},{\"name\":\"name\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}],\"version\":\"1\"}";