0

My server install kafka use docker: wurstmeister/kafka.

When I use kafka-clients 2.2.0 to produce and consume kafka message, my producer works well, but consumer can't get any message.

but while I use shell, this command can't get message:

/opt/kafka_2.12-2.2.0/bin/kafka-console-consumer.sh --bootstrap-server 10.2.0.242:9092 --topic mytesttopic --from-beginning

and this command can get all message:

/opt/kafka_2.12-2.2.0/bin/kafka-console-consumer.sh --bootstrap-server 10.2.0.242:9092 --topic mytesttopic --from-beginning --partition 0
String  kafkaServer = "10.2.0.242:9092";
String defaultTopic = "mytesttopic";

// create topic
Properties props = new Properties();
props.put("bootstrap.servers", kafkaServer);
try (AdminClient client = AdminClient.create(props)) {
    CreateTopicsResult ret = client.createTopics(Arrays.asList(new NewTopic(defaultTopic, 1, (short) 1)));
    ret.all().get();
}

// send message
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer<String, String>(props);
ProducerRecord<String, String> msg = new ProducerRecord<>(defaultTopic, "a1", "test1");
Future ret = producer.send(msg);
System.out.println("send ok: " + ret.get());

// recieve message
props = new Properties();
props.put("bootstrap.servers", kafkaServer);
props.put("group.id", "testaaa4aaa");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer consumer = new KafkaConsumer<String, String>(props);
TopicPartition partition1 = new TopicPartition(defaultTopic, 0);
consumer.assign(Arrays.asList(partition1));
//        consumer.subscribe(Arrays.asList(defaultTopic));
Duration duration = Duration.ofMillis(100);
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(duration);
// here always get 0.
    System.out.println(records.count());
}

I tryed set TopicPartition in my code, but still can't get message, can anybody help me?

youbl
  • 134
  • 1
  • 11
  • I found console message: [Consumer clientId=consumer-1, groupId=testaaa4aaa] Received FindCoordinator response ClientResponse(receivedTimeMs=1559204286805, latencyMs=13, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, clientId=consumer-1, correlationId=4), responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null))) – youbl May 30 '19 at 08:34
  • It worked while I removed this line : ``` props.put("group.id", "testaaa4aaa"); ``` but why I must specify the partition?? – youbl May 30 '19 at 08:41
  • Possible duplicate of [difference between groupid and consumerid in Kafka consumer](https://stackoverflow.com/questions/34550873/difference-between-groupid-and-consumerid-in-kafka-consumer) – Progman May 30 '19 at 11:19

2 Answers2

0

As the console message says, it seems that your group coordinator is not available. Did you recently update/change your Docker image? The thing is that if you do not specify a group.id or if you manually specify a partition for the console consumer, you are not using Kafka's group management (and hence there is no coordinator). So it makes sense that this fixes the issue. What happens if you use a different group id?

KWer
  • 281
  • 2
  • 7
0

I’m trying Kafka 2.13 and find the same problem. If the Kafka-console-producer creates messages before the KafkaConsumer comes into existence, then the KafkaConsumer can’t poll those messages. Not sure why and hope this helps.

  • As it’s currently written, your answer is unclear. Please [edit] to add additional details that will help others understand how this addresses the question asked. You can find more information on how to write good answers [in the help center](/help/how-to-answer). – Community Apr 06 '23 at 17:48