1

I reading from a Kafka Topic which has data in Avro format below is my code :

 Dataset<Row> df = spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "<IP>:9092")
                .option("subscribe", "avrò-data")
                .option("auto.offset.reset", "latest")
                .option("checkpointLocation", "/tmp")
                .load();

  Dataset<Row> ds2 =  df.select( from_avro(col("value"), schema).as("rows"),
                col("timestamp"));

// Creating a DataFrame with Timestamp 

   ds2.createOrReplaceTempView("ds2");
   Dataset<Row> ds3 = spark.sql("select rows.* , timestamp from ds2 ");

// Creating a Table

   ds3.createOrReplaceTempView("table");
   Dataset<Row> result = spark.sql("select * from table ; ");



       

       StreamingQuery query = result
                .writeStream()
                .queryName("Test query")
                .outputMode("complete")
                .format("console")
                .outputMode(OutputMode.Update())
                .start();

I am getting the value as null for my I can see the data in Kafka side but in Spark all values are coming as zero or null below is my output:

   +--------+-----+-------------+-----+----+--------------------+
|event_id|value|         type|check|name|           timestamp|
+--------+-----+-------------+-----+----+--------------------+
|        |    0|2.6912392E-37|false|   |2022-02-01 15:30:...|
|        |    0|2.6912392E-37|false|   |2022-02-01 15:30:...|
|        |    0|2.6912392E-37|false|   |2022-02-01 15:30:...|
|        |    0|2.6912392E-37|false|   |2022-02-01 15:30:...|
|        |    0|2.6912392E-37|false|   |2022-02-01 15:30:...|
+--------+-----+-------------+-----+----+--------------------+

Is I am doing anything wrong here ? Not able to figure out why value are coming empty ?

Below is Schema return for the schema registry:

 String schema = "{\"type\":\"record\",\"name\":\"SparkEvent\",\"namespace\":\"com.Saprak\",\"fields\":[{\"name\":\"event_id\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"value\",\"type\":\"int\"},{\"name\":\"type\",\"type\":\"float\"},{\"name\":\"check\",\"type\":\"boolean\",\"default\":true},{\"name\":\"name\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}],\"version\":\"1\"}";
Marcin_S
  • 529
  • 3
  • 14
  • It's null because it's parsed wrong. Where did you define schema? What actually is the Avro schema? How was the data produced? `from_avro` does not work with any Schema Registry – OneCricketeer Feb 01 '22 at 13:43
  • @OneCricketeer I calling the schema registry to get the schema, I have updated the question with the schema – Marcin_S Feb 01 '22 at 14:25
  • Does this help? https://stackoverflow.com/questions/48882723/integrating-spark-structured-streaming-with-the-confluent-schema-registry – OneCricketeer Feb 01 '22 at 14:28
  • Tried casting to String not working in Java, any recommended method from you side ? – Marcin_S Feb 01 '22 at 15:29
  • The data is Avro. Why did you expect strings to work? If you're using the schema registry, the linked post has all the information you should need (and does not cast anything to strings, it actually uses Confluent's KafkaAvroDeserializer methods). If you dont want to read that link, then go here https://github.com/AbsaOSS/ABRiS – OneCricketeer Feb 01 '22 at 18:24

0 Answers0