4

This is the consumer for the apache kafka and it is not getting the messages from the topic "test"

package com.kafka;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class ConsumerTest {

  public static void main(String[] args) {
     Properties props = new Properties();
     props.put("bootstrap.servers", "172.17.210.45:9092");
     props.put("zookeeper.connect", "172.17.210.45:2181");
     props.put("group.id", "test-consumer-group");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("auto.offset.reset", "earliest");
     props.put("session.timeout.ms", "30000");
     props.put("key.deserializer",  "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

     KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
     System.out.println("properties loaded");
     kafkaConsumer.subscribe(Arrays.asList("test"));

     while (true) {
         ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
         for (ConsumerRecord<String, String> record : records) {
           System.out.printf("offset = %d, value = %s", record.offset(), record.value());
           System.out.println();
         }
     }

  }
}

In this result am not getting any messages from the apache kafka.

     log4j:WARN No appenders could be found for logger                                                                                                                (org.apach  e.kafka.clients.consumer.ConsumerConfig).
     log4j:WARN Please initialize the log4j system properly.
properties loaded
Papershine
  • 4,995
  • 2
  • 24
  • 48
Teja
  • 45
  • 1
  • 2
  • 7

1 Answers1

1

Your code seems correct. I suggest to control if ip 172.17.210.45 is reachable.

ping 172.17.210.45

and

telnet 172.17.210.45 9092
telnet 172.17.210.45 2181

Check the existing topics on the server

bin/kafka-topics.sh --list --zookeeper 172.17.210.45:2181

Then you could try to move your consumer to the beginning (this line has to be added after kafkaConsumer.subscribe:

kafkaConsumer.seekToBeginning(Collections.emptyList());

At last I suggest to add few System.out.println(records.size) lines in the infinite loop, after kafkaConsumer.poll(100). Just to see if hangs waiting for a record or what else.

UPDATE

If you have one or more group.id in the producer part, you should use one of them in the consumer part.

freedev
  • 25,946
  • 8
  • 108
  • 125
  • @freedev...If i add kafkaConsumer.seekToBeginning(Collections.emptyList()); It is throwing the error.. – Teja Feb 03 '17 at 09:39
  • Here am getting the list of topics, but am not getting the messages for the specific topic... – Teja Feb 03 '17 at 09:46
  • If you are reading from the topic "test" I assume that the topic exist. I also suggest to be sure to use same group id used by the producer part. – freedev Feb 03 '17 at 09:52