2

We are using Kafka Streaming library to build real time notification kind of system for incoming messages on a Kafka topic, so while the streaming app is running, it processes all incoming messages in a topic at real time and send notification if it encounter certain kind of pre-defined incoming message.

If in case the Streaming App is down and it is started again, we require to process only recent messages arriving after streaming app is initialized. This is to avoid processing old records which were not processed while streaming app was not running or down. By default the streaming App starts processing old messages since last committed offset. Is there any setting in Kafka Streaming App to allow processing only most recent message?

Arun Y
  • 931
  • 10
  • 12
  • Based on my understanding, by default kafka consumer picks from where consumer group offset was left (last committed offset). Its doesn't depend on time. So in my case I might need to reject records whose timestamp is older than consumer start time. And of course that means I need to have message with associated timestamp. As per https://stackoverflow.com/questions/39514167/retrieve-timestamp-based-data-from-kafka starting 0.10.0 one can associate a timestamp with message. – Arun Y Oct 20 '18 at 16:37
  • The solution is detailed in this thread : https://stackoverflow.com/questions/45075147/seektoend-of-all-partitions-and-survive-automatic-rebalancing-of-kafka-consumers – Moh Jan 12 '19 at 17:30
  • If your group id is dynamic then you will process the latest message each time whenever your streaming app starts. – Arpan Sharma Oct 11 '19 at 04:17

2 Answers2

1

KafkaConsumer's 'auto.offset.reset' default value is 'latest' but You want to use KafkaStreams, default is 'earliest' reference : https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L634

Therefore, if set auto.offset.reset is 'latest' it will be what you want.

bistros
  • 1,139
  • 1
  • 9
  • 23
  • Actually I tried 'latest' however that only works if there is no consumer offset. Otherwise if consumer offset is already > 0 then auto.offset.reset has no effect. The client will pick the message from next offset. – Arun Y Oct 20 '18 at 16:24
1

Your assumption is correct. Even if your set auto.offset.reset to latest, your app already have a consumer offset.

So you will have to reset the offsets to latest with the kafka-consumer-groups command with those options --reset-offsets --to-latest --execute.

Check the different reset scenarios , you can even reset to a particularly datetime, or by period, from a file etc..

Saïd Bouras
  • 286
  • 1
  • 4