2

I have a Kafka consumer which reads messages from a topic and writes it in to a hive table using spark. When I run the code on the Yarn, it keeps reading the same messages multiple times. I have around 100,000 messages in the topic. But, my consumer keeps on reading the same multiple times. I get the actual count when I do a distinct.

Here is the code that I have written. I wonder if I am missing any setting.

 val spark = SparkSession.builder()
      .appName("Kafka Consumer")
      .enableHiveSupport()
      .getOrCreate()

    import spark.implicits._

    val kafkaConsumerProperty = new Properties()
    kafkaConsumerProperty.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "---")
    kafkaConsumerProperty.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaConsumerProperty.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaConsumerProperty.put(ConsumerConfig.GROUP_ID_CONFIG, "draw_attributes")
    kafkaConsumerProperty.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    kafkaConsumerProperty.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
    val topic = "space_orchestrator"
    val kafkaConsumer = new KafkaConsumer[String,String](kafkaConsumerProperty)
    kafkaConsumer.subscribe(Collections.singletonList(topic))
    
    while(true){

      val recordSeq = kafkaConsumer.poll(10000).toSeq.map( x => x.value())
      if(!recordSeq.isEmpty)
        {
          val newDf = spark.read.json(recordSeq.toDS)
          newDf.write.mode(SaveMode.Overwrite).saveAsTable("dmart_dev.draw_attributes")
        }
    }
yAsH
  • 3,367
  • 8
  • 36
  • 67
  • Out of interest, how quickly does your consumer read the 100,000 messages? If it's less than the auto-commit interval (default 5 seconds, IIRC) and you stop polling before the 5 seconds has elapsed without explicitly closing the consumer, the offsets will never commit (see answer to https://stackoverflow.com/questions/38230862/need-clarification-about-kafka-auto-commit-and-auto-commit-interval-ms) – Levi Ramsey Jul 22 '20 at 04:08
  • That behavior is one of many reasons I always manually commit offsets. – Levi Ramsey Jul 22 '20 at 04:09
  • @LeviRamsey: I tried manual commit. But, I am still seeing duplicate records. – yAsH Jul 22 '20 at 17:52
  • When you say duplicates, do you mean reading the same offset twice or the same message at multiple offsets. In general, Kafka doesn't protect against the latter (what protection it does provide often hurts performance too much to be truly useful). – Levi Ramsey Jul 22 '20 at 18:46
  • I think it is reading the same offset twice. – yAsH Jul 22 '20 at 20:25
  • You can determine that with `kafkaConsumer.poll(100000).toSeq.map(x => (x.partition -> x.offset) -> x.value)`, something like `recordSeq.foreach { case ((partition, offset), _) => println(s"$partition:$offset") }` and `val newDf = spark.read.json(recordSeq.map(_._2).toDS)` – Levi Ramsey Jul 22 '20 at 23:29

1 Answers1

1

As an alternative, try setting offsets manually. For this, auto commit should be disabled (enable.auto.commit = false). For manual committing KafkaConsumers offers two methods, namely commitSync() and commitAsync(). As the name indicates, commitSync() is a blocking call, that does return after offsets got committed successfully, while commitAsync() returns immediately.

user8363477
  • 655
  • 4
  • 14
  • 24