I create a new consumer with this configuration:
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": addresses,
"group.id": "my_group",
"auto.offset.reset": "earliest",
})
topic := "testTopic"
if err = c.SubscribeTopics([]string{topic}, nil); err != nil {
panic(err)
}
Then I produce events based on the following code and consume one event:
events := []map[string]string{
{
"name": "Foo",
},
{
"name": "Bar",
},
}
err = p.ProduceEvent(events[0])//there is a wrapper to produce events
err = p.ProduceEvent(events[1])
res, err := c.ReadMessage(100 * time.Second)
time.Sleep(20 * time.Second)
c.Close()
when I describe the group with
watch /home/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my_group --describe
. The results in each step is:
I can't understand why finally the lag is zero! I just consumed one events. It's weird behaviour to me, that Close
would be change the offset. Any clue is appreciated.