2

I'm using spring cloud stream alongside Aiven's schema registry which uses confluent's schema registry. Aiven's schema registry is secured with a password. Based on these instructions, these two config parameters need to be set to successfully access the schema registry server.

 props.put("basic.auth.credentials.source", "USER_INFO");
 props.put("basic.auth.user.info", "avnadmin:schema-reg-password");

Everything is fine when I only use vanilla java's kafka drivers, but if I use Spring cloud stream, I don't know how to inject these two parameters. At the moment, I'm putting "basic.auth.user.info" and "basic.auth.credentials.source" under "spring.cloud.stream.kafka.binder.configuration" in the application.yml file.

Doing this, I'm getting "401 Unauthorized" on the line where the schema wants to get registered.

Update 1:

Based on 'Ali n's suggestion, I updated the way SchemaRegistryClient's bean was configured so that it becomes aware of the SSL context.

@Bean
public SchemaRegistryClient schemaRegistryClient(
    @Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint) {
  try {
    final KeyStore keyStore = KeyStore.getInstance("PKCS12");
    keyStore.load(new FileInputStream(
            new File("path/to/client.keystore.p12")),
        "secret".toCharArray());

    final KeyStore trustStore = KeyStore.getInstance("JKS");
    trustStore.load(new FileInputStream(
            new File("path/to/client.truststore.jks")),
        "secret".toCharArray());

    TrustStrategy acceptingTrustStrategy = (X509Certificate[] chain, String authType) -> true;

    SSLContext sslContext = SSLContextBuilder
        .create()
        .loadKeyMaterial(keyStore, "secret".toCharArray())
        .loadTrustMaterial(trustStore, acceptingTrustStrategy)
        .build();

    HttpClient httpClient = HttpClients.custom().setSSLContext(sslContext).build();
    ClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(
        httpClient);
    ConfluentSchemaRegistryClient schemaRegistryClient = new ConfluentSchemaRegistryClient(
        new RestTemplate(requestFactory));
    schemaRegistryClient.setEndpoint(endpoint);
    return schemaRegistryClient;
  } catch (Exception ex) {
    ex.printStackTrace();
    return null;
  }
}

This helped getting rid of the error on app's startup and registered the schema. However, whenever the app wanted to push a message to Kafka, a new error was thrown again. Finally this was also fixed by mmelsen's answer.

Stennie
  • 63,885
  • 14
  • 149
  • 175
Milad
  • 608
  • 1
  • 6
  • 14

3 Answers3

5

I ran into the same problem as the situation I was in was to connect to a secured schema registry hosted by aiven and secured by basic auth. In order for me to make it work I had to configure the following properties:

spring.kafka.properties.schema.registry.url=https://***.aiven***.com:port
spring.kafka.properties.basic.auth.credentials.source=USER_INFO
spring.kafka.properties.basic.auth.user.info=username:password

the other properties for my binder are:

spring.cloud.stream.binders.input.type=kafka
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.brokers=https://***.aiven***.com:port <-- different from the before mentioned port
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.security.protocol=SSL
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=truststore.jks
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.truststore.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.type=PKCS12
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.location=clientkeystore.p12
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.key.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.streams.binder.autoCreateTopics=false

what actually happens is that Spring cloud stream will add the spring.kafka.properties.basic* to the DefaultKafkaConsumerFactory and that will add the config to the KafkaConsumer. At some point during the initialization of the spring kafka, a CachedSchemaRegistryClient is created that is provisioned with these properties. This Client contains a method called configureRestService that will check if a map of properties contains "basic.auth.credentials.source". As we provide this through the spring.kafka.properties it will find this property and will take care of creating the appropriate headers when accessing the schema registry's endpoint.

hope this works out for you as well.

I'm using spring cloud version Greenwich.SR1, spring-boot-starter 2.1.4.RELEASE, avro-version 1.8.2 and confluent.version 5.2.1

mmelsen
  • 636
  • 1
  • 8
  • 24
  • if you are using this can you also tell me how I can enforce that schema is pulled from the registry instead of using local? – tsar2512 Dec 06 '20 at 06:07
1

The binder configuration only handles well-known consumer and producer properties.

You can set arbitrary properties at the binding level.

spring.cloud.stream.kafka.binding.<binding>.consumer.configuration.basic.auth...
Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • moved the parameters under "spring.cloud.stream.kafka.bindings.output.producer.configuration" but the error + stacktrace didn't change – Milad Apr 17 '19 at 00:53
1

Since Aiven uses SSL for Kafka security protocol, it is required to use certificates for the authentication.

You can follow this page to understand how it works. In the nutshell, you need to run the following command to generate certificates and import them:

openssl pkcs12 -export -inkey service.key -in service.cert -out client.keystore.p12 -name service_key
keytool -import -file ca.pem -alias CA -keystore client.truststore.jks

Then you can use the following properties to make use of the certificates:

spring.cloud.stream.kafka.streams.binder:
  configuration:
    security.protocol: SSL
    ssl.truststore.location: client.truststore.jks
    ssl.truststore.password: secret
    ssl.keystore.type: PKCS12
    ssl.keystore.location: client.keystore.p12
    ssl.keystore.password: secret
    ssl.key.password: secret
    key.serializer: org.apache.kafka.common.serialization.StringSerializer
    value.serializer: org.apache.kafka.common.serialization.StringSerializer
Ali
  • 1,759
  • 2
  • 32
  • 69