0

I am currently using org.apache.kafka.clients.consumer.Consumer, but I am open to any java client if I can ensure what I need.

Is it possible to poll one message from each partition of the requested topic using a single group-id?

My case is that I want to poll the last current message from each partition on a particular topic. I have an implementation where I created one consumer per partition and just polled one time for each - smth like that:

final String topic = "requested-topic";

final Consumer<String, ParcelInboxEvent> consumer1 = jsonConsumerFactory.createConsumer("monitor");
final Consumer<String, ParcelInboxEvent> consumer2 = jsonConsumerFactory.createConsumer("monitor");

consumer1.assign(List.of(new TopicPartition(topic, 0)));
consumer1.seekToEnd(List.of(new TopicPartition(topic, 0)));
consumer1.seek(new TopicPartition(topic, 0), consumer1.position(new TopicPartition(topic, 0)) - 1);
    
consumer2.assign(List.of(new TopicPartition(topic, 1)));
consumer2.seekToEnd(List.of(new TopicPartition(topic, 1)));
consumer2.seek(new TopicPartition(topic, 1), consumer2.position(new TopicPartition(topic, 1)) - 1);

final ConsumerRecords<String, Object> poll1 = consumer1.poll(Duration.ofSeconds(10));
final ConsumerRecords<String, Object> poll2 = consumer2.poll(Duration.ofSeconds(10));

but I wonder if I can do it in one step, using one consumer.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
ByeBye
  • 6,650
  • 5
  • 30
  • 63
  • What errors do you get when you do use one consumer? Also, what configs are you giving to it? `max.poll.records` defaults to 500 – OneCricketeer May 23 '22 at 12:14
  • there is not error using one consumer, I just can't ensure that after poll I will get at least one message from each partition – ByeBye May 23 '22 at 13:22
  • Nothing can guarantee that you get any event. The pull duration could timeout with no data, for example. You'd need to iterate over `ConsumerRecords` object or call its `.count()` method and check it returns 2, for example, for your two assigned partitions (for one consumer instance). And again, you might want to set `max.poll.records=1` – OneCricketeer May 23 '22 at 16:47
  • Yeah, the timeout can occur ofc, but I am talking about situation when I am 1 message behind on each partition and with one consumer I can poll 1 message from each. Currently I am manually assigning each consumer to one partition and work with that – ByeBye May 24 '22 at 06:21
  • And that's the correct solution, but you can/should assign multiple partitions to one consumer. What exactly did you mean that you cannot ensure you'll only get one record per partition, then? I think you'll need to iterate or check the poll count, like I said if you want to "verify" anything – OneCricketeer May 24 '22 at 13:51
  • If I will assign more partitions to a single consumer, after poll I can get all records from one partition only – ByeBye May 24 '22 at 16:30
  • What if you write a loop over each partition? Assign, seek, poll 1, Unassign, repeat? – OneCricketeer May 25 '22 at 14:34
  • It will be fine, but not really optimization. for now, I am using multiple consumers, seeking the latest, and then polling for each. – ByeBye May 25 '22 at 19:09

0 Answers0