I am doin a POC for consuming Avro messages from kafka and deserializing it using io.confluent.kafka.serializers.KafkaAvroDeserializer
with schema registry URL.
Here is the consumer config,
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, property.getProperty("bootstrap.servers"))
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[KafkaAvroDeserializer])
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
I want to parse the Json output into a File or a dataframe. I am reading from multiple topics so I for each topic I want to create a different file or dataframe.
Consumer code:
println("Creating Kafka consumer!!!")
val consumer = new KafkaConsumer(props)
val topics: List[String] = List(property.getProperty("kafka.consumer.topic.names"))
consumer.subscribe(topics.asJava)
val records = consumer.poll(Duration.ofMinutes(4)).asScala
//val samplelist = records.toList
try {
var sampleSet : GenericRecord = null
//while(true){
for(record <- records){
println(s"*****************************")
println(s"${record.value()} ")
//sampleSet = record.value()
/*val writer = new BufferedWriter(new FileWriter(new File("s.json"),true))
writer.write(record.value().toString)
writer.close()*/
}
//}
}
finally consumer.close()
Output:
***********************************
{"HML_A_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}
************************************
{"HML_A_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}
****************************
{"HML_A_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}
*******************************
{"HML_A_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}
**************************
{"HML_C_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}
I used file writer to write it into a .json file, but I am not very confident whether this approach is correct or wrong.
I want to store it in multiple dataframes like
HML_A_DATA : Dataframe =
HML_ID | CREATE_DATETIME | ----------- | COV_ORDR_TYP_ID | COB_STRT_DT |
---|---|---|---|---|
\u0002cf· | 1269064388106000 | ---------- | P | null |
\u0002cf· | 1269064388106000 | ---------- | P | null |
---- | ---------- | -------- | ------ | -------- |
for different topics that I am consuming from.
spark Version = 2.4.4 scala Version = 2.11.12