0

My topic has many partitions, many producers, but only one consumer. The consumer can run only for a short period of time, let's say one minute. During this period, I need to make sure it will consume all the records from all the partitions that were produced before the consumer was initialized, but ALSO the records produced during the minute the consumer was running.

The problem is that can't find the correct partition assignation strategy that guarantee I will get all the partitions. If I use consumer.Subscribe(topic), I will only get some partitions, not all. If I use consumer.Assign(partition) during the initialization, I WILL get all the active partitions, but if a new partition comes along and receives records, I will miss those.

The only solution I have so far is to re-do the assignments periodically (every 10 seconds).

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245

1 Answers1

1

If I use consumer.Subscribe(topic), I will only get some partitions, not all

It should get all. If you don't get all, then that means you more than likely have some un-closed consumer instance in the same consumer group that has already been assigned other partitions.

You can periodically run kafka-consumer-groups --describe command to inspect this.

Using assignment doesn't use the consumer group protocol, thus why it would work.


There is no guarantee Kafka can provide that your consumer will read any/all data between two time intervals. You'd need to track this on your own, and may potentially require your consumer instance to run for longer than you expect.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • I ran kafka-consumer-groups --describe and noticed many consumer id associated with my topic, each with a different partition, which seems to be consistent with your prediction. However I don't really understand how this is possible, since I'm the only one consuming this topic, presumably each time I relaunch my application a different consumer-id is created. I wonder how I can prevent this from happening, i.e. the part where my consumer is left un-closed. – Utilitaire CCV Jul 21 '22 at 18:46
  • You can hard-code a `client.id` in your consumer config. But otherwise, you might want to consider adding `Runtime.addShutdownHook` to call `consumer.close()` assuming you are using Java or maybe [ProcessExit handle](https://stackoverflow.com/a/11291396/2308683) in dotnet – OneCricketeer Jul 21 '22 at 18:49
  • Other option is to `kafka-consumer-groups --reset-offsets` (or use a new group ID), then work on figuring out when you should be closing the consumer instance when you're done with it in the app – OneCricketeer Jul 21 '22 at 18:54
  • I didn't know about the client.id. Thank you, I think it should solve my problem ! – Utilitaire CCV Jul 21 '22 at 19:32