0

Can anyone help me out from the below queries.I am using kafka-clients-0.10.1.1(Single Node Single Broker)

Default value of auto.create.topics.enable is true.

1.I am sending message to a topic using

    kafkaProdcuer<String,String> producer> producer...
    producer.send(new ProducerRecord<String, String>("my- topic","message"));
    producer.close();

For Consuming :

    kafkaConsumer<String,String> consumer....
    consumer.subscribe(Arrays.asList("my-topic"));
    ConsumerRecords<String, String> records = consumer.poll(200);

    while(true){
     for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.value());
         }
     }

The problem is when I run the consumer for the first time, It does not get values. And I must run the producer and run the consumer again to gets the values. Some times i have to run producer 3 times. Why is this working in this way?

2.) enable.auto.commit=false

Can same consumer read message multiple times if enable.auto.commit property is false?

3.)Considering my consumer code in 1st point.How can I break the loop I mean How can consumer know it has read all messages and then call consumer.close()

jena84
  • 311
  • 1
  • 3
  • 20
  • There's a console-consumer in kafka bin, you can try it while your own consumer cannot consume data. And try to add producer.flush() if possible. For you question 3, there's no way for a streaming program to know the end of a batch, but you can set a timeout thread to monitor a timeout with no data consumed. – Lhfcws Dec 29 '16 at 12:37
  • Yes I tested it with bin consumer,it gives an error for the time Error while fetching metadata with correlation id 1 : {my-topic-106=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) – jena84 Dec 29 '16 at 14:36
  • Have you produced data recently before your consuming data? By default Kafka only keeps your data storaged for 3 days. – Lhfcws Dec 30 '16 at 05:06

1 Answers1

1

1) Are you always using the same group.id in the consumer? Are you producing before consuming? This may be related to consumer groups and offset management. Please see this answer about consumer offset behavior.

2) Not sure if you mean read duplicates intentionally or by accident. You can always read the same message again seeking to that position as long as the message as not been deleted due to topic retention policy. If you mean by accident, auto-commit set to false just means that consumer will not commit offsets for you, you have to do it manually calling commitSync() or commitAsync(). In any case there is still a chance of your consumer processing a message and crashing before committing, in that case when consumer recover it will read those processed-but-not-committed messages again. If you want exactly once semantic you have to do something else, like storing offsets atomically with processed messages.

3) As Lhfcws mentioned, in a stream there is no concept like "all messages". Some things (tricks) that you can do are:

  • You can check if record list returned by poll if empty and and after some configured number of times break the loop and exit.
  • If messages are ordered (you are reading from a single partition) you can send a kind of special END_OF_DATA message, when you see it, you close the consumer.
  • You can make the consumer read a number of messages and then exit, next time it will continue from last committed offset.
Community
  • 1
  • 1
Luciano Afranllie
  • 4,053
  • 25
  • 23
  • Thanks Lhfcws and Luciano.I am now clear about 2nd and 3rd point.About 1st point,I am running consumer immediately after the producer.I am not changing the consumer group.I am not creating topic using bin utility. I assume the code producer.send will create the topic. bootstrap.servers=localhost:9092 group.id=test enable.auto.commit=true – jena84 Dec 30 '16 at 09:58
  • 1
    jena84, try setting auto.offset.reset to "earliest" in consumer config and try again. Also, after starting consumer wait for rebalance to complete. – Luciano Afranllie Dec 30 '16 at 12:50
  • Awesome!!! It worked.Thanks very much.Is it because of the reason mentioned in the link that you gave about offset management.I also tried putting "smallest" It did not allow me. Is it because of the new consumer API? – jena84 Dec 30 '16 at 14:18
  • Glad it worked. Yes, new consumer API uses "earliest" while old consumer uses "smallest". If you are using 0.10 clients then take care of using "New Consumer Configs" (http://kafka.apache.org/documentation/#newconsumerconfigs) – Luciano Afranllie Dec 30 '16 at 17:33