I have created a Kafka consumer in Apache Flink API written in Scala. Whenever I pass some messages from a topic, it duly is receiving them. However, when I restart the consumer, instead of receiving the new or unconsumed messages, it consumes the latest message that was sent to that topic.
Here's what I am doing:
Running the producer:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic corr2
Running the consumer:
val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("zookeeper.connect", "localhost:2181") properties.setProperty("group.id", "test") val env = StreamExecutionEnvironment.getExecutionEnvironment val st = env .addSource(new FlinkKafkaConsumer09[String]("corr2", new SimpleStringSchema(), properties)) env.enableCheckpointing(5000) st.print() env.execute()
Passing some messages
- Stopping the consumer
- Running the consumer again prints the last message I sent. I want it to print only new messages.