6

I'm setting up a Kafka consumer configuration and the configuration cannot find the keystore or truststore on the classpath:

@EnableKafka
@Configuration
public class KafkaConfig {

    @Value("${kafka.ssl.keystore}")
    private String keyStorePath;
    @Value("${kafka.ssl.truststore}")
    private String trustStorePath;

    @Bean
    public ConsumerFactory<String, String> getConsumerFactory() {

        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"my-bootstrap.mydomain.com:443");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "client1");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "500");
        properties.put("session.timeout.ms", "30000");
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
        properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStorePath);
        properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "password");
        properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStorePath);
        properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "password");
        properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "password");

        return new DefaultKafkaConsumerFactory<>(properties);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory
                = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(getConsumerFactory());
        return factory;
    }
}

The keystore and truststore are both located in the directory src/main/resources/ssl in the same maven module as the configuration class.

I set up the placeholders in the application.yml as follows:

kafka:
  ssl:
    keystore: classpath:ssl/kafka-keystore.jks
    truststore: classpath:ssl/kafka-truststore.jks

However, the application fails to start with the following exception:

"org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: classpath:ssl/kafka-keystore.jks (No such file or directory)"

My understanding is that using @Value enables the use of the classpath: prefix to resolve the classpath (see this link) https://www.baeldung.com/spring-classpath-file-access

Moreover, the @Value technique works just fine to resolve the keystores and truststores for the reactive WebClient configuration in the same application.

What do I need to do to resolve the classpath for the Kafka configuration? Am I missing something here?

Skywarp
  • 989
  • 3
  • 15
  • 32

2 Answers2

10

Your injecting into a String which is going to keep the "classpath:" within the String value and provide it as a property to DefaultKafkaConsumerFactory, try injecting into a spring Resource like:

import org.springframework.core.io.Resource;

@Value("classpath:path/to/file/in/classpath")
Resource resourceFile;

Then you can access the file and you could get the absolute path like:

resourceFile.getFile().getAbsolutePath()

The idea being you could provide the absolute path to DefaultKafkaConsumerFactory

But you could also try removing the "classpath:" and inject as String like your current code which might work depending on how DefaultKafkaConsumerFactory treats that property. But I can't see why absolute path above wouldn't work.

camtastic
  • 955
  • 6
  • 15
  • Not sure why it works when using String to resolve the classpath when configuring a reactive WebClient but no with the Kafka config. Regardless, Resource works just fine, and I overlooked that they were using Resource and not String in the tutorial. – Skywarp Sep 13 '19 at 15:17
1

For those like me that were using Spring Boot and Spring Kafka and don't override a DefaultKafkaConsumerFactory - only use properties for configuration -, there's a BeanPostProcessor class that you can implement. It provides two methods:

postProcessAfterInitialization and postProcessBeforeInitialization

Factory hook that allows for custom modification of new bean instances — for example, checking for marker interfaces or wrapping beans with proxies. Typically, post-processors that populate beans via marker interfaces or the like will implement postProcessBeforeInitialization(java.lang.Object, java.lang.String), while post-processors that wrap beans with proxies will normally implement postProcessAfterInitialization(java.lang.Object, java.lang.String).

I was using Spring Boot with Spring Kafka and I only wanted a change for local profile.

In my code example, I was using it to override Kafka Location properties, because for SSL it doesn't read from classpath.

So this was the code:

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
import java.io.IOException;
import java.util.Arrays;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;

@Configuration
@RequiredArgsConstructor
public class KafkaConfiguration implements BeanPostProcessor {

  @Value("${spring.kafka.ssl.key-store-location:}")
  private Resource keyStoreResource;
  @Value("${spring.kafka.properties.schema.registry.ssl.truststore.location:}")
  private Resource trustStoreResource;
  private final Environment environment;

  @SneakyThrows
  @Override
  public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
    if (bean instanceof KafkaProperties) {
      KafkaProperties kafkaProperties = (KafkaProperties) bean;
      if(isLocalProfileActive()) {
        configureStoreLocation(kafkaProperties);
      }
    }
    return BeanPostProcessor.super.postProcessAfterInitialization(bean, beanName);
  }

  private boolean isLocalProfileActive() {
    return Arrays.stream(environment.getActiveProfiles()).anyMatch(profile -> "local".equals(profile));
  }

  private void configureStoreLocation(KafkaProperties kafkaProperties) throws IOException {
    kafkaProperties.getSsl().setKeyStoreLocation(new FileSystemResource(keyStoreResource.getFile().getAbsolutePath()));
    kafkaProperties.getProperties().put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreResource.getFile().getAbsolutePath());
    kafkaProperties.getSsl().setTrustStoreLocation(new FileSystemResource(trustStoreResource.getFile().getAbsolutePath()));
    kafkaProperties.getProperties().put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreResource.getFile().getAbsolutePath());
  }

}

This way I could have on my properties file:

spring.kafka.ssl.key-store-location=classpath:mykeystore.jks

And the code would get the absolute path from that and set it. It makes also possible to filter based on profiles.

It's important to mention that BeanPostProcessor runs for EVERY bean, so make sure you filter what you want.

Raphael Amoedo
  • 4,233
  • 4
  • 28
  • 37
  • 1
    @ Raphael Amoedo, Ton of thanks to you! After struggling for 2 days, I found your solution and thats the only one which worked for me. Spring Kafka client is having some serious issue. It is not at al able to resolve truststore and keystore path neither from classpath nor from file system. I tried many ways of providing the path in application.yml but none worked. At the end, I used the config class as mentioned by you and that only worked. Thanks again! – Ajay Singh Jun 03 '23 at 12:41