I'm using spring cloud stream alongside Aiven's schema registry which uses confluent's schema registry. Aiven's schema registry is secured with a password. Based on these instructions, these two config parameters need to be set to successfully access the schema registry server.
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", "avnadmin:schema-reg-password");
Everything is fine when I only use vanilla java's kafka drivers, but if I use Spring cloud stream, I don't know how to inject these two parameters. At the moment, I'm putting "basic.auth.user.info"
and "basic.auth.credentials.source"
under "spring.cloud.stream.kafka.binder.configuration"
in the application.yml
file.
Doing this, I'm getting "401 Unauthorized"
on the line where the schema wants to get registered.
Update 1:
Based on 'Ali n's suggestion, I updated the way SchemaRegistryClient's bean was configured so that it becomes aware of the SSL context.
@Bean
public SchemaRegistryClient schemaRegistryClient(
@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint) {
try {
final KeyStore keyStore = KeyStore.getInstance("PKCS12");
keyStore.load(new FileInputStream(
new File("path/to/client.keystore.p12")),
"secret".toCharArray());
final KeyStore trustStore = KeyStore.getInstance("JKS");
trustStore.load(new FileInputStream(
new File("path/to/client.truststore.jks")),
"secret".toCharArray());
TrustStrategy acceptingTrustStrategy = (X509Certificate[] chain, String authType) -> true;
SSLContext sslContext = SSLContextBuilder
.create()
.loadKeyMaterial(keyStore, "secret".toCharArray())
.loadTrustMaterial(trustStore, acceptingTrustStrategy)
.build();
HttpClient httpClient = HttpClients.custom().setSSLContext(sslContext).build();
ClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(
httpClient);
ConfluentSchemaRegistryClient schemaRegistryClient = new ConfluentSchemaRegistryClient(
new RestTemplate(requestFactory));
schemaRegistryClient.setEndpoint(endpoint);
return schemaRegistryClient;
} catch (Exception ex) {
ex.printStackTrace();
return null;
}
}
This helped getting rid of the error on app's startup and registered the schema. However, whenever the app wanted to push a message to Kafka, a new error was thrown again. Finally this was also fixed by mmelsen's answer.