3

I use spring-kafka for springboot 2.0.4.RELEASE.

And use KafkaListener for get message

Now I want to reset the offset for my group

But i do not how to get the consumer for the group

    @KafkaListener(id="test",topics={"test"},groupId="group",containerFactory="batchContainerFactory")
    public String listenTopic33(List<ConsumerRecord<Integer, String>> record, Acknowledgment ack){
// do something

    }


    @Autowired
    KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    public void test() {

        MessageListenerContainer test3 = kafkaListenerEndpointRegistry.getListenerContainer("test3");
}

xianda zhu
  • 31
  • 1
  • 2

1 Answers1

4

If you want to seek the consumer in the listener itself, simply add a Consumer<?, ?> consumer parameter to the listener method.

Bear in mind that the container may have fetched more messages so you will get them before the seek takes affect. You could set max.poll.records=1 to avoid that.

You can also add a custom RemainingRecordsErrorHandler to the container, throw an exception in the listener, and the error handler will get the remaining records instead of the listener.

Also see Seeking to a Specific Offset.

In order to seek, your listener must implement ConsumerSeekAware, which has the following methods:

void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

The first method is called when the container is started. You should use this callback when seeking at some arbitrary time after initialization. You should save a reference to the callback. If you use the same listener in multiple containers (or in a ConcurrentMessageListenerContainer), you should store the callback in a ThreadLocal or some other structure keyed by the listener Thread.

When using group management, the second method is called when assignments change. You can use this method, for example, for setting initial offsets for the partitions, by calling the callback. You must use the callback argument, not the one passed into registerSeekCallback. This method is never called if you explicitly assign partitions yourself. Use the TopicPartitionInitialOffset in that case.

The callback has the following methods:

void seek(String topic, int partition, long offset);

void seekToBeginning(String topic, int partition);

void seekToEnd(String topic, int partition);

You can also perform seek operations from onIdleContainer() when an idle container is detected. See Detecting Idle and Non-Responsive Consumers for how to enable idle container detection.

To arbitrarily seek at runtime, use the callback reference from the registerSeekCallback for the appropriate thread.

Here's an example; we keep track of the callbacks for each topic/partition...

@SpringBootApplication
public class So56584233Application {

    public static void main(String[] args) {
        SpringApplication.run(So56584233Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send(new ProducerRecord<>("so56584233", i % 3, "foo", "bar")));
            while (true) {
                System.in.read();
                listener.seekToStart();
            }
        };
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so56584233", 3, (short) 1);
    }

}

@Component
class Listener implements ConsumerSeekAware {


    private static final Logger logger = LoggerFactory.getLogger(Listener.class);


    private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();

    private static final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
        callbackForThread.set(callback);
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        assignments.keySet().forEach(tp -> this.callbacks.put(tp, callbackForThread.get()));
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
    }

    @KafkaListener(id = "so56584233", topics = "so56584233", concurrency = "3")
    public void listen(ConsumerRecord<String, String> in) {
        logger.info(in.toString());
    }

    public void seekToStart() {
        this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
    }

}
Community
  • 1
  • 1
Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • I added an example for how to do arbitrary seeking outside the context of the listener. – Gary Russell Jun 13 '19 at 16:42
  • Thanks you for your help .But I need to reset the offset by time. for example yesterday. So i need to get consume for the groupid . Use the code Map offsetsForTimes = consumer.offsetsForTimes(endTimestampsToSearch); . So i do not know how to get consumer when sometime i want to reset .can you help me ?@Gray Russell – xianda zhu Jun 14 '19 at 01:49
  • Does [this answer help you](https://stackoverflow.com/questions/52973119/how-to-test-a-consumerawarerebalancelistener/52976119#52976119)? The `ConsumerAwareRebalanceListener` is called when the partitions are assigned. You can't call the consumer on some arbitrary thread, it is not thread safe. If you need to change the offset later while running, you can stop() the container and re-start it; the listener will be called again. You need to explain your use case more clearly. – Gary Russell Jun 14 '19 at 13:05
  • Thanks .I have used ConsumerAwareRebalanceListener to solve it . – xianda zhu Jun 27 '19 at 09:09
  • Hi Gary, in this sample, can we do `callbackForThread.set(callback)` in `onPartitionsAssigned` method instead of `registerSeekCallback`? And use it to seek offsets later? – yifei Jul 16 '20 at 07:25
  • Don't ask new questions in comments on old answers. No; it's a different callback; ask a new question to explain why you might want to do this. – Gary Russell Jul 16 '20 at 13:32
  • @GaryRussell: I didn't find any more explanation on how the code works than what you have provided here (New bee here). Could you please provide a little more explanation ? I am not able to understand how the callBack is used to seek? I don't see any of the seekCallBack methods being called. – Maverick Aug 30 '20 at 19:27
  • @Maverick Don't ask new questions in comments on old answers; always ask a new question; the framework has evolved since this answer and we now have `AbstractConsumerSeekAware` to make things easier. There are [examples in the documentation](https://docs.spring.io/spring-kafka/docs/2.5.5.RELEASE/reference/html/#seek). The upcoming 3.6 release (due this week) [has even more improvements](https://docs.spring.io/spring-kafka/docs/2.6.0-SNAPSHOT/reference/html/#seek). If you still need help, ask a new question with specifics. – Gary Russell Aug 30 '20 at 22:05