3

I have created a Kafka consumer in Apache Flink API written in Scala. Whenever I pass some messages from a topic, it duly is receiving them. However, when I restart the consumer, instead of receiving the new or unconsumed messages, it consumes the latest message that was sent to that topic.

Here's what I am doing:

  1. Running the producer:

    $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic corr2
    
  2. Running the consumer:

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "test")
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val st = env
        .addSource(new FlinkKafkaConsumer09[String]("corr2", new SimpleStringSchema(), properties))
    env.enableCheckpointing(5000)
    st.print()
    env.execute()
    
  3. Passing some messages

  4. Stopping the consumer
  5. Running the consumer again prints the last message I sent. I want it to print only new messages.
Piyush Shrivastava
  • 1,046
  • 2
  • 16
  • 43

1 Answers1

6

You are running a Kafka consumer with a checkpoint interval of 5 seconds. So every 5 seconds, Flink is creating a copy of your operator's state (the offsets) for recovery.

Once the checkpoint is completed, it will let the operator know that the checkpoint is finished. On that notification, the Kafka consumer commits the offsets to Zookeeper. So roughly every 5 seconds, we are writing the offsets of the last checkpoint into ZK.

When you start the Flink job again, it will find the offsets in ZK and go on from there. Depending on the timing, all messages received after the commit to ZK will be send again.

You can not avoid this behavior because the .print() "operator" is not part of the checkpointing. Its meant as a debugging utility. However a data sink which participates in the checkpointing (for example the rolling file sink) will ensure that no duplicates are written to the file system.

Robert Metzger
  • 4,452
  • 23
  • 50
  • could you tell the path in zookeeper where offset information will be stored? – vishnu viswanath Jun 22 '16 at 18:35
  • Check out this page: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper – Robert Metzger Jun 23 '16 at 08:54
  • thanks @rmetzger, i ran a flink job and was expecting the offsets to be stored in the consumers path in the zookeeper, but under the consumers, i could only see console-consumer-65334. I am not sure what is going on, could you give some suggestion why this could happen? – vishnu viswanath Jun 23 '16 at 18:04
  • Are you using Kafka 0.8 or 0.9? 0.9 isn't committing to ZK by default. Do you have checkpointing enabled? If no, we only commit the offset with an interval of 60 seconds. So if you cancelled the job earlier, you won't see it in ZK. – Robert Metzger Jun 23 '16 at 20:21
  • i am using 0.9, i don't have checkpointing enabled yet, but I did run the job for more than 20 minutes once. – vishnu viswanath Jun 23 '16 at 20:22
  • Okay, so Kafka won't write into ZK at all. – Robert Metzger Jun 24 '16 at 08:47
  • Ok. Thank you. I will enable checkpointing and test. But I thought you said even if checkpointing is not enabled the offset is committed every 60 seconds and I didn't find any offsets even after running > 60s. – vishnu viswanath Jun 27 '16 at 16:50
  • I think the answer does not fix the problem. I have the same problem (same message gets consumed after start up), even though I have checkpoints enabled. I'm using Kafka 0.10 and Flink only knows about my Kafka broker (new client) and not ZK, if that matters. I'm using the BucketingSink, same problem with RollingSink. – static-max Sep 13 '16 at 16:29