0

I am trying to connect to my local kafka cluster from spring-kafka with SASL_SCRAM enabled in it.It is working perfectly as followed the repository of SASL SCRAM Docker Implementation

The issue I am facing is when i am trying to connect externally to this cluster from spring kafka module.The configuration file is as follows:

package org.kafka.spring.configuration;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class AdminConfigurer {


    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Value("${spring.kafka.properties.security.protocol}")
    private String securityProtocol;

    @Value("${spring.kafka.properties.sasl.mechanism}")
    private String saslMechanism;
    private final String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
    private final String adminJaasCfg = String.format(jaasTemplate, "kafkabroker", "password");

    //SSL Related Attributes
    @Value("${spring.kafka.ssl.keystore-location}")
    private String keystoreJks;

    public String getBootstrapAddress() {
        return bootstrapAddress;
    }

    public void setBootstrapAddress(String bootstrapAddress) {
        this.bootstrapAddress = bootstrapAddress;
    }

    public String getSecurityProtocol() {
        return securityProtocol;
    }

    public void setSecurityProtocol(String securityProtocol) {
        this.securityProtocol = securityProtocol;
    }

    public String getSaslMechanism() {
        return saslMechanism;
    }

    public void setSaslMechanism(String saslMechanism) {
        this.saslMechanism = saslMechanism;
    }

    public String getKeystoreJks() {
        return keystoreJks;
    }

    public void setKeystoreJks(String keystoreJks) {
        this.keystoreJks = keystoreJks;
    }

    public String getTruststoreJks() {
        return truststoreJks;
    }

    public void setTruststoreJks(String truststoreJks) {
        this.truststoreJks = truststoreJks;
    }

    public String getKeystorePassword() {
        return keystorePassword;
    }

    public void setKeystorePassword(String keystorePassword) {
        this.keystorePassword = keystorePassword;
    }

    public String getTruststorePassword() {
        return truststorePassword;
    }

    public void setTruststorePassword(String truststorePassword) {
        this.truststorePassword = truststorePassword;
    }

    @Value("${spring.kafka.ssl.truststore-location}")
    private String truststoreJks;

    @Value("${spring.kafka.ssl.keystore-password}")
    private String keystorePassword;

    @Value("${spring.kafka.ssl.truststore-password}")
    private String truststorePassword;

    @Bean
    public Map<String, Object> kafkaAdminProperties() {
        final Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19094,localhost:29094,localhost:39094");
        configs.put("sasl.mechanism", saslMechanism);
        configs.put("security.protocol", securityProtocol);
        configs.put("ssl.keystore.location", keystoreJks);
        configs.put("ssl.keystore.password", keystorePassword);
        configs.put("ssl.truststore.location", truststoreJks);
        configs.put("ssl.truststore.password", truststorePassword);
        configs.put("sasl.jaas.config", adminJaasCfg);
        configs.put("ssl.endpoint.identification.algorithm", "");
        return configs;
    }


    @Bean
    public AdminClient getClient() {
        return AdminClient.create(kafkaAdminProperties());
    }

}

The application.yml file is as follows:

spring:
  kafka:
    bootstrap-servers: localhost:19094,localhost:29094,localhost:39094
    properties:
      sasl:
        jaas:
          config: org.apache.kafka.common.security.scram.ScramLoginModule required username=kafkaclient password=password;
        mechanism: SCRAM-SHA-256
      security:
        protocol: SASL_SSL
    ssl:
      truststore-location: /home/guru/Downloads/kafka-spring-api/src/main/resources/ssl/admin/kafka.producer.truststore.jks
      truststore-password: confluent
      keystore-location: /home/guru/Downloads/kafka-spring-api/src/main/resources/ssl/admin/kafka.producer.keystore.jks
      keystore-password: confluent
      key-password: confluent

I am getting below issue while creating topic from spring-kafka api style.

java.net.UnknownHostException: kafka-broker-1
    at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:797) ~[na:na]
    at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1509) ~[na:na]
    at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1368) ~[na:na]
    at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1302) ~[na:na]
    at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:511) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:468) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:988) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:301) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:1128) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1388) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1331) ~[kafka-clients-3.1.1.jar:na]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]

java.net.UnknownHostException: kafka-broker-2
    at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:797) ~[na:na]
    at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1509) ~[na:na]
    at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1368) ~[na:na]
    at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1302) ~[na:na]
    at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:511) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:468) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:988) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:301) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:1128) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1388) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1331) ~[kafka-clients-3.1.1.jar:na]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]

java.net.UnknownHostException: kafka-broker-3
    at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:797) ~[na:na]
    at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1509) ~[na:na]
    at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1368) ~[na:na]
    at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1302) ~[na:na]
    at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:511) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:468) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:988) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:301) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:1128) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1388) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1331) ~[kafka-clients-3.1.1.jar:na]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245

0 Answers0