1

Currently I use the AcknoledgingMessageListener to implement a Kafka consumer using spring-Kafka. This implementation helps me listen on a specific topic and process messages with a manual ack. I now need to build the following capability: Let us assume that for an some environmental exception or some entry of bad data via this topic, I need to replay data on a topic from and to a specific offset. This would be a manual trigger (mostly via the execution of a Java class).

It would be ideal if I can retrieve the messages between those offsets and feed it is a replay topic so that a new consumer can process those messages thus keeping the offsets intact on the original topic.

  1. CosumerSeekAware interface - if this is the answer how can I trigger this externally? Via let say a mvn -Dexec. I am not sure if this is even possible
  2. Also let say that I have an crash time stamp with me, is it possible to introspect the topic to find the offset corresponding to the crash so that I can replay from that offset?
  3. Can I find offsets corresponding to some specific data so that I can replay those specific offsets?

All of these requirements are towards building a resilience layer around our Kafka capabilities. I need all of these to be managed by a separate executable class that can be triggered manually providing the relevant data (like time stamps etc). This class should determine offsets and then seek to that offset, retrieve the messages corresponding to those offsets and post them to a separate topic. Can someone please point me in the right direction? I’m afraid I’m going around in circles.

Nimantha
  • 6,405
  • 6
  • 28
  • 69
ananth
  • 129
  • 2
  • 14

1 Answers1

1

so that a new consumer can process those messages thus keeping the offsets intact on the original topic.

Just create a new listener container with a different group id (new consumer) and use a ConsumerAwareRebalanceListener (or ConsumerSeekAware) to perform the seeks when the partitions are assigned.

Here is a sample CARL that seeks all assigned topics based on a timestamp.

You will need some mechanism to know when the new consumer should stop consuming (at which time you can stop() the new container). Maybe set max.poll.records=1 on the new consumer so he doesn't prefetch past the failure point.

I am not sure what you mean by #3.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • 1. Is it possible to run the new class (implementing ConsumerAwareRebalanceListener or ConsumerSeekAware or when you mean a container, it is a separate mvn spring-boot:run? 2. If I know the end offset is there a way to shutdown the container? Also when I bring up this container how can I provide it the start offset? 3. In #3 - let us say I want to process only those messages that has a particular ID. So I read messages starting at an offset and only process that message having this ID 4. Is the strategy of putting into a separate topic correct for actually replaying the data? – ananth Nov 25 '18 at 17:14
  • 1. You can do anything you like - separate app or something in the main app. 2. Call `stop()` on the container; you can get a reference to it using it's id from the `KafkaListenerEndpointRegistry` - see the reference manual. Just pass the starting offset(s) into the listener before starting it. 3. Yes; there is no random access to a kafka topic/partition. 4. I don't know what you mean by this. – Gary Russell Nov 25 '18 at 19:17
  • On that last bit - my actual strategy for replay is for this new consumer group to read off of the original topic, retrieve the messages and then put that into a new topic which will then be separately consumed. I was hoping that this would keep the original offsets intact, or am I missing something? – ananth Nov 25 '18 at 19:22
  • Thank you for all the patient answers, I will try out an implementation and let you know. Hope I can hit you up if I run into any troubles, I find this whole thing a little daunting :( – ananth Nov 25 '18 at 19:23
  • I am not sure what you mean by "keep the original offsets intact" - the new consumer won't affect the offsets of the main consumer. You don't have to do any of this yourself; starting with version 2.2 you can use a `SeekToCurrentErrorHandler` together with a `DeadLetterPublishingRecoverer` and the framework will automatically move a record that keeps failing to another topic. See [the reference manual](https://docs.spring.io/spring-kafka/reference/html/_reference.html#dead-letters). – Gary Russell Nov 25 '18 at 19:53
  • Unfortunately the errors are known only after a external issue has rendered that ingestion as invalid. Say I find out post facto to processing that records from offer x to y need reprocessing ( for some business reason too). These are not meant to move to dlq at the time of processing, this is more a reactive safeguard being built in. I get the offsets point you mention, they are tied to the consumer and won’t be affected. Will try out your suggestions and let you know. Thanks for the help:) – ananth Nov 25 '18 at 23:10
  • I am unclear as to where we can call the seek from.I save the callback from registerCallBack method onto a threadlocal. The consumer implements ConsumerSeekAware, AcknowledgingMessageListener. So do I call seek in the onMessage method - this does not work for me as onMessage will not get triggered, I am really lost as to where I invoke the seek from. I need anytime access (when I bring up my container) - calling from the spring boot application doesnt work. Also after a seek doesnt this consumer that I have written get the message - in that case how can I call seek in onMessage? – ananth Dec 03 '18 at 20:52
  • See `onPartitionsAssigned()` - it is called before any `onMessage()` call. See the javadocs in `ConsumerSeekAware`. – Gary Russell Dec 03 '18 at 21:22
  • onPartitionsAssigned is a no go for me as I need to run this off of a command line when processing is happening. This is a new container which I bring up just to redo the records at certain offsets. If I store the callback as a plain static var (not ThreadLocal), then I am able to access it in the `SpringBootApplication` and then I am able to call seek using this variable after `SpringApplication.run(...)`. Is this correct? Also I am forced to keep the variable as static. To this spring boot application I intend to pass command line args (via java -jar ...) – ananth Dec 03 '18 at 21:28
  • It's not clear what your problem is; if it's a command-line argument, you can add it as a variable `-Dstart.offset=45` and that can be accessed by the application as a system property. This commentary is getting too long; the admins don't like a lot of comments; if you still have a question, ask a new one, showing exactly what you have tried including all the code/configuration. I can't "guess" at what you are doing. – Gary Russell Dec 03 '18 at 21:36