3

When a Kafka consumer of consumer group A connects to the Kafka broker I would like to seek to the end of all partitions, even if an offset is stored on the broker side. If more additional consumers are connecting for the same consumer group they should pickup the latest stored offsets. I am doing the following:

consumer.poll(timeout) 
consumer.seekToEnd(emptyList())

while(true) {
  val records = consumer.poll(timeout)
  if(records.isNotEmpty()) {
    //print records
    consumer.commitSync()
  }
}

The problem is when I connect the first consumer c1 of consumer group A everything works as expected, if I connect an additional consumer c2 of consumer group A, the group is rebalancing and c1 will consume the skipped offsets.

Any ideas?

clausmc
  • 300
  • 4
  • 14

1 Answers1

7

You could create a class which implements ConsumerRebalanceListener, as shown below:

public class AlwaysSeekToEndListener<K, V> implements ConsumerRebalanceListener {

    private Consumer<K, V> consumer;

    public AlwaysSeekToEndListener(Consumer consumer) {
        this.consumer = consumer;
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        consumer.seekToEnd(partitions);
    }
}

Then use this listener when subscribing the topics:

consumer.subscribe(Collections.singletonList("test"), new AlwaysSeekToEndListener<String, String>(consumer));
amethystic
  • 6,821
  • 23
  • 25
  • Thanks. That should do it. However is there also an option to commit the current offsets if the partitions are revoked, so I not always need to seek from the end? Or won't that work? – clausmc Jul 13 '17 at 09:31
  • That's what `onPartitionsRevoked` does. This method is ensured to be invoked before a rebalance operation starts and you could take care of the offset commiting things in it. – amethystic Jul 13 '17 at 09:35