I'm new to kafka and have been trying to implement a consumer. Below is my scenario
- start consumer application
- produce messages from producer. these messages are consumed by consumer
- stop the consumer and produce messages again. when I start the consumer, the messages that were published while the consumer was stopped are not being read
Although auto.offset.commit=earliest
will consume messages, it consumes all the messages published to the topic. I want to consume only those messages that were published when the consumer was down.
var options = new KafkaOptions(new Uri(kafkaUri));
var router = new BrokerRouter(options);
var consumer = new Consumer(new ConsumerOptions(kafkaTopic, router));
var offset = consumer.GetTopicOffsetAsync(kafkaTopic, 100000).Result;
var t = from x in offset select new OffsetPosition(x.PartitionId, x.Offsets.Max());
consumer.SetOffsetPosition(t.ToArray());
foreach (Message msg in consumer.Consume()) {
string kafkaResponse = System.Text.Encoding.UTF8.GetString(msg.Value);
Console.WriteLine("PickList Json : " + kafkaResponse);
offsetCommitRequest.Offset = msg.Meta.Offset;
offsetCommitRequest.PartitionId = msg.Meta.PartitionId;
offsetCommitRequest.Topic = kafkaTopic;
offsetCommitRequest.Metadata = "CommitOffset";
var offsetCommitResponse = await _kafkaPublishService.SetOffsetvalue(kafkaUri, kafkaTopic, consumerGroup, offsetCommitRequest);
}