42

I have a list of topics (for now it's 10) whose size can increase in future. I know we can spawn multiple threads (per topic) to consume from each topic, but in my case if the number of topics increases, then the number of threads consuming from the topics increases, which I do not want, since the topics are not going to get data too frequently, so the threads will sit ideal.

Is there any way to have a single consumer to consume from all topics? If yes, then how can we achieve it? Also how will the offset be maintained by Kafka? Please suggest answers.

khelwood
  • 55,782
  • 14
  • 81
  • 108
Akhil
  • 1,184
  • 1
  • 18
  • 42
  • Even if my Consumer is subscribed to multiple topics, is there any ordering guarantee for the data I'm receiving? If another identical consumer was subscribed to the same set of topics, would they get records in the exact same order? – Aaron Apr 23 '23 at 08:24

2 Answers2

29

We can subscribe for multiple topic using following API :

consumer.subscribe(Arrays.asList(topic1,topic2), ConsumerRebalanceListener obj)

Consumer has the topic info and we can commit using consumer.commitAsync or consumer.commitSync() by creating OffsetAndMetadata object as follows.

ConsumerRecords<String, String> records = consumer.poll(long value);

for (TopicPartition partition : records.partitions()) {
    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
    
    for (ConsumerRecord<String, String> record : partitionRecords) {
        System.out.println(record.offset() + ": " + record.value());
    }
    
    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();

    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
sdgfsdh
  • 33,689
  • 26
  • 132
  • 245
Subrata Saha
  • 444
  • 5
  • 8
  • I know, we can, but how the offset will be maintained by Kafka? Also, having a single consumer group will solve my problem? – Akhil Sep 19 '16 at 09:34
  • 10
    Offsets are committed by your app and stored in a special offsets kafka topic called __consumer_offsets. Offsets are kept for each partition of each topic so it doesn't matter how many topics you subscribe to. – Hans Jespersen Jul 26 '17 at 23:40
  • 2
    This appears to require all topics use the same serializer. Any way to allow a different serializer per topic in a single consumer poll? – Ryan Feb 05 '21 at 04:43
  • What if I have 3 partitions per topic say topic1 has 3 partitions and topic2 has 4 partitions. Will it be possible for a single consumer to subscribe to 7 partitions across different topics? – Krithick S Apr 03 '23 at 14:34
6

There is no need for multiple threads, you can have one consumer, consuming from multiple topics. Offsets are maintained by zookeeper, as kafka-server itself is stateless. Whenever a consumer consumes a message,its offset is commited with zookeeper to keep a future track to process each message only once. So even in case of kafka failure, consumer will start consuming from the next of last commited offset.

Bhawna Arora
  • 61
  • 1
  • 3