9

I am a newbie to Kafka and running a simple kafka consumer/producer example as given on KafkaConsumer and KafkaProducer. When I am running consumer from terminal, consumer is receiving messages but I am not able to listen using Java code. I have searched for similar issues on StackoverFlow also (Links: Link1, Link2) and tried that solutions but nothing seems to be working for me. Kafka Version: kafka_2.10-0.10.2.1 and corresponding maven dependency is used in pom.

Java Code for producer and consumer:

public class SimpleProducer {
public static void main(String[] args) throws InterruptedException {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9094");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);
    for(int i = 0; i < 10; i++)
        producer.send(new ProducerRecord<String, String>("topic3", Integer.toString(i), Integer.toString(i)));

    producer.close();

}}

public class SimpleConsumer {

public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9094");
    props.put("group.id", "test");
    props.put("zookeeper.connect", "localhost:2181");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("topic3", "topic2"));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}}

Starting kafka: bin/kafka-server-start.sh config/server.properties (I have already set port, brokerid in properties file)

mohiitg
  • 113
  • 1
  • 2
  • 7
  • when you say from terminal you mean using the kafka consumer/producer tools ? Maybe posting your Java source code could be useful. – ppatierno Jun 26 '17 at 14:49
  • Added code. And yes, if I am running kafka consumer from terminal, I am able to listen messages from above Java producer code. – mohiitg Jun 26 '17 at 15:06
  • Enable logging for your java producer and set it to DEBUG level to see what your producer is doing. Also make sure the topic name is correct and that your consumer has the from beginning setting – PragmaticProgrammer Jun 26 '17 at 15:35
  • You don't need to put **"zookeeper.connect"** prop. – Sanjay Jun 27 '17 at 09:45

6 Answers6

17

First check what all the groups are available by using :

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

Then check for which group your topic belongs by using below cmd :

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <your group name> --describe

Once you find your topic and associated group name (just replace group.id with your group if it not belongs to default group) then try with below prop and let me know if it works :

  props.put("bootstrap.servers", "localhost:9092");
  props.put("group.id", "test-consumer-group"); // default topic name
  props.put("enable.auto.commit", "true");
  props.put("auto.commit.interval.ms", "1000");
  props.put("session.timeout.ms", "30000");
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
  KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

  //Kafka Consumer subscribes list of topics here.
  consumer.subscribe(Arrays.asList(topicName));  // replace you topic name

  //print the topic name

  java.util.Map<String,java.util.List<PartitionInfo>> listTopics = consumer.listTopics();
  System.out.println("list of topic size :" + listTopics.size());

  for(String topic : listTopics.keySet()){
      System.out.println("topic name :"+topic);
  }
Sanjay
  • 1,078
  • 11
  • 15
  • Its listing all the topics, including the one that consumer has subscribed(topic3) – mohiitg Jun 27 '17 at 09:20
  • listing all the topic with java program or through cmd? if through java program that means your topic is belongs to **default group id** and if this is the case then now using above prop you should be able to fetch data from topic. – Sanjay Jun 27 '17 at 09:24
  • Thanks a million!! Your solution worked for me.I had been struggling since so many days to fix this.But if I want to create a consumer group and link it to the topic I have created ,how can I do that so that I don't use the default setting. – SOFuser123 Apr 03 '18 at 11:44
  • I am working with kafka for first time and had same problem. @Sanjay your answer really helped. – sandeep Jan 22 '20 at 05:29
1

Run the consumer before running the producer so that the consumer registers with the group coordinator first.Later when u run the producer the consumer consumes the messages.The first time u run the consumer its registering with the group coordinator.In order to find out till what offset the consumer has consumed the messages use this kafka-consumer-offset-checker.bat --group group-1 --topic testing-1 --zookeeper localhost:2181 This shows the consumer has consumed which offset of the topic last.

codehacker
  • 381
  • 5
  • 15
  • I am facing error :- java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.Consumer.subscribe(Ljava/util/Collection;)V – ShrJoshi Jun 28 '19 at 12:42
0

Clear Your 'tmp' folder in the drive which you accessing kafka. then open new 'cmd' command window! Restart server freshly, and post " .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic H1 --from-beginning "this code in the command window to run consumer without any error

Yaseen Ahmad
  • 1,807
  • 5
  • 25
  • 43
Aravind A
  • 21
  • 2
0

Try to set enable.partition.eof parameter to false:

props.put("enable.partition.eof", "false");

That worked for me.

Giorgos Myrianthous
  • 36,235
  • 20
  • 134
  • 156
DenisKolodin
  • 13,501
  • 3
  • 62
  • 65
0

Try this one this code worked for me.

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
KafkaConsumer<String, String> myConsumer = new KafkaConsumer<>(props);
myConsumer.subscribe(Arrays.asList(topicName));
myConsumer.subscribe(topics);

try{
      while (true) {
              ConsumerRecords<String, String> records = myConsumer.poll(100);
              for (ConsumerRecord<String, String> record : records) {
                  System.out.println(String.format( "Topic: %s, Partition: %d, Offset: %d, key: %s, value: %s",
                          record.topic(),record.partition(), record.offset(),record.key(),record.value()
                  ));
              }}
    }catch (Exception e){
        System.out.println(e.getMessage());
    }finally {
        myConsumer.close();
    }
itsKrrish
  • 1
  • 1
0

I faced this issue with latest release of kafka kafka_2.13-2.6.0.tgz (asc, sha512) when installed locally on windows 7. Messages were not flowing from producer to consumer. Checked and found that Consumer offset topic was not created not sure why.

Installed earlier version of Kakfa kafka_2.12-2.5.0.tgz and it started working. Consumer offset topic was created with the old version