0

my Spring boot 2.4.1 run in localhost (192.168.189.115), kafka 2.13-2.6.0 run in 192.168.48.54:9092

I can post message to producer kafka with http://localhost:8010/kafka/publish?message=HelloKafka success.

but consumer got error Connection to node -1 (/192.168.48.54:9092) could not be established. Broker may not be available.

I try to change server.properties

listeners=PLAINTEXT://192.168.48.54:9092
advertised.listeners=PLAINTEXT://192.168.48.54:9092

or (also commented it both)

listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://192.168.48.54:9092

application.properties

server.port=8010
spring.kafka.bootstrap-servers=192.168.48.54:9092
spring.kafka.consumer.group-id=fm-group

KafKaController.java

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
    
    @Autowired
    private Producer producer;

    @GetMapping(value = "/publish")
    public void sendMessageToKafkaTopic(@RequestParam("message") String message) {
        this.producer.produce(message);
    }
    
}

Producer.java

@Service
public class Producer {
    
    private static final Logger logger = LogManager.getFormatterLogger(Producer.class);
    private static String TOPIC = "customer.topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void produce(String data) {
        logger.info("Produce Topic: %s - Message: %s", TOPIC, data);
        this.kafkaTemplate.send(TOPIC, data);
    }
    
}

Consumer.java

@Service
public class Consumer {
    
    private static final Logger logger = LogManager.getFormatterLogger(Consumer.class);

    @KafkaListener(topics = "customer.topic", groupId = "fm-group")
    public void consume(String message) throws IOException {
        logger.info("Consume Message: %s", message);
    }
    
}

in kafka server i can ping my ip (192.168.189.115). I don't know why consumer could not be established. I try all resolution in stackoverflow already. Please help me.

EDIT#1 I changed Producer.java to

public void produce(String data) {
        logger.info("Produce Topic: %s - Message: %s", TOPIC, data);              
        try {
            ListenableFuture<SendResult<String, String>> future = this.kafkaTemplate.send(TOPIC, data);
            logger.info("test");
            SendResult<String, String> sendResult = future.get(10, TimeUnit.SECONDS);
            logger.info("sendResult ", sendResult.getRecordMetadata());
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

i got message reply

org.apache.kafka.common.errors.TimeoutException: Topic customer.topic not present in metadata after 60000 ms.

org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic customer.topic not present in metadata after 60000 ms.

it appear that i can't send message too. Why i can't connect ? please help me

i use ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe

Topic: customer.topic   PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: customer.topic   Partition: 0    Leader: 0       Replicas: 0     Isr: 0
Topic: streams-wordcount-output PartitionCount: 1       ReplicationFactor: 1    Configs: cleanup.policy=compact,segment.bytes=1073741824

EDIT#2 i also put this in pom.xml as suggestd in org.apache.kafka.common.errors.TimeoutException: Topic not present in metadata after 60000 ms

but it still doesn' work

<dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
    </dependency>
    <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-core</artifactId>
    </dependency>
somsaktk33
  • 36
  • 1
  • 5
  • i try listeners=PLAINTEXT://:9092 but kafka console message say it can't bind this ip address. – somsaktk33 Jan 12 '21 at 09:55
  • org.apache.kafka.common.KafkaException: Socket server failed to bind to 192.168.189.115:9092: Cannot assign requested address. – somsaktk33 Jan 12 '21 at 10:00
  • check your server, producer and consumer address configuration; keep them all the same – zhang-yuan Jan 13 '21 at 06:53
  • thanks @zhang-yuan. i did it. also do the new project as example in https://memorynotfound.com/spring-kafka-json-serializer-deserializer-example/ but still got the same error "Topic customer.topic not present in metadata after 60000 ms." – somsaktk33 Jan 13 '21 at 08:57
  • my administrator tell me port 9092 service still not run , but i check with sudo ss -tulwn port 9092, 2181 is displayed (don't know what is it). i don't know how to set port 9092 service open to outside – somsaktk33 Jan 13 '21 at 09:50
  • em... 2181 is the default port of zookeeper, and kafka depends on it; look at @OneCricketeer 's comment below, if you're not using docker or some cloud server, you just need to config the `listeners` – zhang-yuan Jan 14 '21 at 07:13
  • thanks @zhang-yuan i try to set only listeners=PLAINTEXT://0.0.0.0:9092 i comment # on advertised.listeners but i got error advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address. – somsaktk33 Jan 14 '21 at 08:21
  • also try listeners=PLAINTEXT://:9092 i got error java.net.BindException: Cannot assign requested address (comment advertised.listeners too) – somsaktk33 Jan 14 '21 at 08:22

2 Answers2

0

You shouldn't bind to an actual IP as that'll restrict traffic to only that address

This opens up the server to accept all incoming connections on port 9092

listeners=PLAINTEXT://0.0.0.0:9092
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • thanks , i try listeners=PLAINTEXT://0.0.0.0:9092 i got advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. use a routable ip address. so i need to set advertised.listeners=PLAINTEXT://192.168.48.54:9092 but it still the same error – somsaktk33 Jan 13 '21 at 02:12
  • 1
    You only needed to change the listeners, not the advertised ones. There are debugging steps mentioned here https://www.confluent.io/blog/kafka-listeners-explained/ – OneCricketeer Jan 13 '21 at 15:57
0

finally, it's network problem. firewalld on 192.168.48.54 is open, i need to disabled it.

somsaktk33
  • 36
  • 1
  • 5