-1

I'm new to kafka and have been trying to implement a consumer. Below is my scenario

  1. start consumer application
  2. produce messages from producer. these messages are consumed by consumer
  3. stop the consumer and produce messages again. when I start the consumer, the messages that were published while the consumer was stopped are not being read

Although auto.offset.commit=earliest will consume messages, it consumes all the messages published to the topic. I want to consume only those messages that were published when the consumer was down.

var options = new KafkaOptions(new Uri(kafkaUri));
var router = new BrokerRouter(options);
var consumer = new Consumer(new ConsumerOptions(kafkaTopic, router));
var offset = consumer.GetTopicOffsetAsync(kafkaTopic, 100000).Result;
var t = from x in offset select new OffsetPosition(x.PartitionId, x.Offsets.Max());
consumer.SetOffsetPosition(t.ToArray());

foreach (Message msg in consumer.Consume()) {
    string kafkaResponse = System.Text.Encoding.UTF8.GetString(msg.Value);

    Console.WriteLine("PickList Json : " + kafkaResponse);

    offsetCommitRequest.Offset = msg.Meta.Offset;
    offsetCommitRequest.PartitionId = msg.Meta.PartitionId;
    offsetCommitRequest.Topic = kafkaTopic;
    offsetCommitRequest.Metadata = "CommitOffset";
    var offsetCommitResponse = await _kafkaPublishService.SetOffsetvalue(kafkaUri, kafkaTopic, consumerGroup, offsetCommitRequest);
}
Mickael Maison
  • 25,067
  • 7
  • 71
  • 68
  • Possible duplicate of [Not clear about the meaning of auto.offset.reset and enable.auto.commit in Kafka](https://stackoverflow.com/questions/44927687/not-clear-about-the-meaning-of-auto-offset-reset-and-enable-auto-commit-in-kafka) – Praveen Rewar Aug 14 '19 at 08:40
  • @PraveenRewar, the link doesnt answer my question. I want to read records that was published during the time when the consumer was down and subsequently restarted. My auto.offset.commit is set to false since I manually commit after processing the message. – sunil kumar Aug 14 '19 at 08:52
  • I don't understand how the consumer restart affects the situation here. Kafka will remember the last message that your consumer read, now even if your consumer goes down and more messages got published in the topic Kafka will still store the offset from where the messages are supposed to be consumed by a specific consumer. – Praveen Rewar Aug 14 '19 at 08:56

1 Answers1

0

You should just restart your consumer without setting any manual offset. Means you should not do that :

var offset = consumer.GetTopicOffsetAsync(kafkaTopic, 100000).Result;
var t = from x in offset select new OffsetPosition(x.PartitionId, x.Offsets.Max());
consumer.SetOffsetPosition(t.ToArray());

And your consumer will catch up from where it has last committed

Yannick

Yannick
  • 1,240
  • 2
  • 13
  • 25