1

I am doin a POC for consuming Avro messages from kafka and deserializing it using io.confluent.kafka.serializers.KafkaAvroDeserializer with schema registry URL.

Here is the consumer config,

props.put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, property.getProperty("bootstrap.servers"))
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[KafkaAvroDeserializer])
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

I want to parse the Json output into a File or a dataframe. I am reading from multiple topics so I for each topic I want to create a different file or dataframe.

Consumer code:

println("Creating Kafka consumer!!!")
val consumer = new KafkaConsumer(props)
val topics: List[String] = List(property.getProperty("kafka.consumer.topic.names"))

consumer.subscribe(topics.asJava)

  
val records = consumer.poll(Duration.ofMinutes(4)).asScala
//val samplelist = records.toList
    
try {
  var sampleSet : GenericRecord = null
  //while(true){
      for(record <- records){
        println(s"*****************************")
        println(s"${record.value()} ")
        //sampleSet = record.value()
        /*val writer = new BufferedWriter(new FileWriter(new File("s.json"),true))
        writer.write(record.value().toString)
        writer.close()*/
        
      }
    //}
}
finally consumer.close()

Output:

***********************************
{"HML_A_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}
************************************
{"HML_A_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}
****************************
{"HML_A_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}
*******************************
{"HML_A_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}
**************************
{"HML_C_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}

I used file writer to write it into a .json file, but I am not very confident whether this approach is correct or wrong.

I want to store it in multiple dataframes like

HML_A_DATA : Dataframe =

HML_ID CREATE_DATETIME ----------- COV_ORDR_TYP_ID COB_STRT_DT
\u0002cf· 1269064388106000 ---------- P null
\u0002cf· 1269064388106000 ---------- P null
---- ---------- -------- ------ --------

for different topics that I am consuming from.

spark Version = 2.4.4 scala Version = 2.11.12

Buzz97
  • 33
  • 5
  • Your requirement is bit unclear. Kindly validate and let us know the end result how it should be achieved for your business case. Your subject says AVRO deserialization but your concern is about File I/O operation. Kindly clarify. But you are using generic record correctly (instead of a specific code binding type) , because you are reading from multiple topics. – ChristDist Jul 06 '22 at 09:01
  • Hi @ChristDist, I have updated the subject line. As far as the requirement, I have successfully parsed the avro messages but, I want to store the parsed Avro messages in a dataframe. – Buzz97 Jul 06 '22 at 10:50
  • Have you looked at [this question](https://stackoverflow.com/questions/39049648/use-schema-to-convert-avro-messages-with-spark-to-dataframe)? – sierikov Jul 06 '22 at 19:38
  • Unclear what you mean by "dataframe" here since you're not using Spark., but also you could just use `kafka-avro-console-consumer... >> file.json` to deserialize Avro into json records, then write the output to a file – OneCricketeer Jul 15 '22 at 14:27
  • I had initially gone with the approach you suggested, but requirement is to read it as a dataframe. One more Update I can not use the Structured and Direct Streams to read from kafka as a Dataframe or RDD, as we are using group.id Authentication. I will add the scala and spark versions I am using. – Buzz97 Jul 18 '22 at 10:00

0 Answers0