7

I am using the KafkaMessageListenerContainer for consuming from the kafka topic, I have an application logic to process each record which is dependent on other micro services as well. I am now manually committing the offset after each record is processed.

But if I the application logic fails I need to seek to the failed offset and keep processing it until it's succeeds. For that I need to do a run time manual seek of the last offset.

Is this possible with the KafkaMessageListenerContainer yet ?

sash
  • 1,124
  • 2
  • 15
  • 32
  • You are committing the offset after the application logic. So if you don't commit the offset in the case when the application logic fails, the offset won't move ahead and you will be processing the same message again. Does this solve your problem? – yaswanth Mar 24 '17 at 12:04
  • 1
    @yaswanth No, it doesn't work like that i guess. I had a similar assumption as you until started to testing it. I had enabled ENABLE_AUTO_COMMIT_CONFIG - false and the container property for AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE with the AcknowledgingMessageListener. I sent record 1 followed by record 2 to the topic, record 1- App logic fails(I don't acknowledge back), at this point the next consumer poll should get the record 1 again but I get the record 2. If this is something fixed by config params please let me know ! – sash Mar 24 '17 at 16:13
  • you are right! I am under false assumption up until now. – yaswanth Mar 25 '17 at 06:22

2 Answers2

9

See Seeking to a Specific Offset.

In order to seek, your listener must implement ConsumerSeekAware which has the following methods:

void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

The first is called when the container is started; this callback should be used when seeking at some arbitrary time after initialization. You should save a reference to the callback; if you are using the same listener in multiple containers (or in a ConcurrentMessageListenerContainer) you should store the callback in a ThreadLocal or some other structure keyed by the listener Thread.

Community
  • 1
  • 1
Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • I am using KafkaListenerContainerFactory, KafkaListenerEndpointRegistry and MethodKafkaListenerEndpoint. Not able to find how to use ConsumerSeekAware as I am not directly writing Listener class. – Sumit Jangra Jan 27 '22 at 11:56
  • Don't ask new questions in comments. You would have to create your own consumer (with the same group id) and do the seeks there, before starting the container. – Gary Russell Jan 27 '22 at 14:15
  • Got your point @Gary. And thanks for the answer. – Sumit Jangra Jan 31 '22 at 05:20
-1

please refer to Spring Kafka document

"For an existing group ID, the initial offset is the current offset for that group ID".

And Confluent also mentioned: "If the consumer crashes or is shut down, its partitions will be re-assigned to another member, which will begin consumption from the last committed offset of each partition"

It means, in your case, when the consumer resumes, it will continue at the last committed offset. You also do not need to "seek to the failed offset" if you commit manually by

ENABLE_AUTO_COMMIT_CONFIG = False

and

factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE)

Valerie
  • 19
  • 3