I have a Kafka consumer which reads messages from a topic and writes it in to a hive table using spark. When I run the code on the Yarn, it keeps reading the same messages multiple times. I have around 100,000 messages in the topic. But, my consumer keeps on reading the same multiple times. I get the actual count when I do a distinct.
Here is the code that I have written. I wonder if I am missing any setting.
val spark = SparkSession.builder()
.appName("Kafka Consumer")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val kafkaConsumerProperty = new Properties()
kafkaConsumerProperty.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "---")
kafkaConsumerProperty.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
kafkaConsumerProperty.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
kafkaConsumerProperty.put(ConsumerConfig.GROUP_ID_CONFIG, "draw_attributes")
kafkaConsumerProperty.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
kafkaConsumerProperty.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
val topic = "space_orchestrator"
val kafkaConsumer = new KafkaConsumer[String,String](kafkaConsumerProperty)
kafkaConsumer.subscribe(Collections.singletonList(topic))
while(true){
val recordSeq = kafkaConsumer.poll(10000).toSeq.map( x => x.value())
if(!recordSeq.isEmpty)
{
val newDf = spark.read.json(recordSeq.toDS)
newDf.write.mode(SaveMode.Overwrite).saveAsTable("dmart_dev.draw_attributes")
}
}