I am trying to connect to my local kafka cluster from spring-kafka
with SASL_SCRAM
enabled in it.It is working perfectly as followed the repository of SASL SCRAM Docker Implementation
The issue I am facing is when i am trying to connect externally to this cluster from spring kafka
module.The configuration file is as follows:
package org.kafka.spring.configuration;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class AdminConfigurer {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value("${spring.kafka.properties.security.protocol}")
private String securityProtocol;
@Value("${spring.kafka.properties.sasl.mechanism}")
private String saslMechanism;
private final String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
private final String adminJaasCfg = String.format(jaasTemplate, "kafkabroker", "password");
//SSL Related Attributes
@Value("${spring.kafka.ssl.keystore-location}")
private String keystoreJks;
public String getBootstrapAddress() {
return bootstrapAddress;
}
public void setBootstrapAddress(String bootstrapAddress) {
this.bootstrapAddress = bootstrapAddress;
}
public String getSecurityProtocol() {
return securityProtocol;
}
public void setSecurityProtocol(String securityProtocol) {
this.securityProtocol = securityProtocol;
}
public String getSaslMechanism() {
return saslMechanism;
}
public void setSaslMechanism(String saslMechanism) {
this.saslMechanism = saslMechanism;
}
public String getKeystoreJks() {
return keystoreJks;
}
public void setKeystoreJks(String keystoreJks) {
this.keystoreJks = keystoreJks;
}
public String getTruststoreJks() {
return truststoreJks;
}
public void setTruststoreJks(String truststoreJks) {
this.truststoreJks = truststoreJks;
}
public String getKeystorePassword() {
return keystorePassword;
}
public void setKeystorePassword(String keystorePassword) {
this.keystorePassword = keystorePassword;
}
public String getTruststorePassword() {
return truststorePassword;
}
public void setTruststorePassword(String truststorePassword) {
this.truststorePassword = truststorePassword;
}
@Value("${spring.kafka.ssl.truststore-location}")
private String truststoreJks;
@Value("${spring.kafka.ssl.keystore-password}")
private String keystorePassword;
@Value("${spring.kafka.ssl.truststore-password}")
private String truststorePassword;
@Bean
public Map<String, Object> kafkaAdminProperties() {
final Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19094,localhost:29094,localhost:39094");
configs.put("sasl.mechanism", saslMechanism);
configs.put("security.protocol", securityProtocol);
configs.put("ssl.keystore.location", keystoreJks);
configs.put("ssl.keystore.password", keystorePassword);
configs.put("ssl.truststore.location", truststoreJks);
configs.put("ssl.truststore.password", truststorePassword);
configs.put("sasl.jaas.config", adminJaasCfg);
configs.put("ssl.endpoint.identification.algorithm", "");
return configs;
}
@Bean
public AdminClient getClient() {
return AdminClient.create(kafkaAdminProperties());
}
}
The application.yml
file is as follows:
spring:
kafka:
bootstrap-servers: localhost:19094,localhost:29094,localhost:39094
properties:
sasl:
jaas:
config: org.apache.kafka.common.security.scram.ScramLoginModule required username=kafkaclient password=password;
mechanism: SCRAM-SHA-256
security:
protocol: SASL_SSL
ssl:
truststore-location: /home/guru/Downloads/kafka-spring-api/src/main/resources/ssl/admin/kafka.producer.truststore.jks
truststore-password: confluent
keystore-location: /home/guru/Downloads/kafka-spring-api/src/main/resources/ssl/admin/kafka.producer.keystore.jks
keystore-password: confluent
key-password: confluent
I am getting below issue while creating topic from spring-kafka api style.
java.net.UnknownHostException: kafka-broker-1
at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:797) ~[na:na]
at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1509) ~[na:na]
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1368) ~[na:na]
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1302) ~[na:na]
at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:511) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:468) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:988) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:301) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:1128) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1388) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1331) ~[kafka-clients-3.1.1.jar:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
java.net.UnknownHostException: kafka-broker-2
at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:797) ~[na:na]
at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1509) ~[na:na]
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1368) ~[na:na]
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1302) ~[na:na]
at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:511) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:468) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:988) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:301) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:1128) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1388) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1331) ~[kafka-clients-3.1.1.jar:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
java.net.UnknownHostException: kafka-broker-3
at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:797) ~[na:na]
at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1509) ~[na:na]
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1368) ~[na:na]
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1302) ~[na:na]
at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:511) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:468) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:988) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:301) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:1128) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1388) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1331) ~[kafka-clients-3.1.1.jar:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]