I’m using a Kafka Source in Spark Streaming to receive records generated using Datagen in Confluent Cloud. I intend to use Confluent Schema Registry,
Currently, this is the exception I am facing : *
Exception in thread “main” io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
the schema registry of confluent cloud requires to pass some authentication data that I don’t know how to enter them:
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=secret: secret
I think I have to pass this authentication data to CachedSchemaRegistryClient but I’m not sure if so and how.
// Setup the Avro deserialization UDF
schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)
spark.udf.register("deserialize", (bytes: Array[Byte]) =>
kafkaAvroDeserializer.deserialize(bytes)
If I am trying to send authentication to schema registry as
val restService = new RestService(schemaRegistryURL)
val props = Map(
"basic.auth.credentials.source" -> "USER_INFO",
"schema.registry.basic.auth.user.info" -> "secret:secret"
).asJava
var schemaRegistryClient = new CachedSchemaRegistryClient(restService, 100, props)
I get
Cannot resolve overloaded constructor CachedSchemaRegistryClient
, seems that only 2 parameters are to be sent to CachedSchemaRegistryClient.
HOW DO I FIX THIS?
I came across this post but here they haven't applied any authentication to schema registry in confluent cloud.