4

I'm currently using consumer groups to read messages from kafka. I have noticed however, that if my consumer goes down and I bring it back up again, it does not consume messages from where it left off. After reading the documentation here, it seems like I would have to implement this functionality myself. I know there's an autooffset.reset config for the consumer, but that just seems to allow me to either consume everything from the beginning, or consume from the last message currently on the queue. Is my understanding correct? That I would have to implement this myself? Or am I missing something here. It seems like a pretty basic feature that any queueing system should provide out of the box.

The version I'm using is 0.8.1.1 with scala version 2.10.

Classified
  • 5,759
  • 18
  • 68
  • 99
Vivek Rao
  • 576
  • 4
  • 25

1 Answers1

2

Based on the link, you're trying to use SimpleConsumer. With SimpleConsumer you need to take care of low level details like managing offsets by yourself. It is more difficult but allows to have more control on how data is consumed.

If all you want is just to read data without worrying much about low level details, take a look at HighLevelConsumer: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

Denis Makarenko
  • 2,853
  • 15
  • 29
  • 1
    Thanks for the link Dennis! That is exactly what I'm using though. If you consider this scenario - 1. Shutdown consumer. 2. Add more messages to the queue. 3. Restart the consumer. The consumer does not consume the messages that were added in step 2. Instead it will wait for new messages only. – Vivek Rao Oct 13 '14 at 22:05
  • If you're using high level consumer, check that either autocommit.enable = true and autocommit.interval.ms is set to some reasonable value or that your code calls commitOffsets() explicitly. If autocommit enabled and your consumer exits before offsets are successfully stored in Zookeeper you can run into a situation like you described, i.e. reading starts from the beginning. – Denis Makarenko Oct 13 '14 at 22:16
  • Thanks for the suggestion! I will try it and update on the results. – Vivek Rao Oct 14 '14 at 14:47
  • Should that also work if I start the producer first? I produced a bunch of messages, then ran my consumer. Seems like the consumer waits for the next message by default. I guess the use cases I want are - Regardless of when producer is started, if consumer is starting for the first time (no offsets in zk), then start from the beginning. If there are offsets, continue from stored offset. Is that possible to do through config? – Vivek Rao Oct 16 '14 at 13:55
  • 1
    Ahh, it's a combination of the configs that gets it to work. auto.offset.reset sets your consumer to the start of the queue if zookeeper has no offsets in it, and setting autocommit.enable = true makes the consumer commit offsets thereof. – Vivek Rao Oct 16 '14 at 15:31
  • For SimpleConsumer there is a method, that allows you to explicitly retrieve available offsets getOffsetsBefore(OffsetRequest request) OffsetRequest.time can be one of the following: OffsetRequest.LatestTime - for reading only new messages OffsetRequest.EarliestTime - for reading from the earliest available - reading from a specific offset – Denis Makarenko Oct 17 '14 at 17:46