1

I can't register an Avro Schema in the Schema Registry due to what looks like an authentication issue.

I've setup a Confluent Cloud cluster and defined an avro schema for a topic via the UI. I've also setup the Api keys via the UI.

I've verified that I can query for subjects using the following curl - curl -u keyid:secretkey https://schema-reg-url/subjects. So the API keys I'm using should be good.

I've tried to setup a RestService with the right properties as well (below), but I still can't seem to connect to the Schema Registry.

I looked through the source for SchemaRegistryClient, but there seems to be no option to specify authentication params.

Am I on the wrong track here?

Note: I've specified the below properties as these are what have been advised in the Confluent API access page.

val rs1: RestService = new RestService("<https://schema-registry-url>")
val props = new util.HashMap[String, String]()
props.put("basic.auth.credentials.source", "USER_INFO")
props.put("schema.registry.basic.auth.user.info", "key_id:secret_key_id")
props.put("schema.registry.url", "https://schema-registry-url")

// this fails
rs1.registerSchema(props, RegisterSchemaRequest.fromJson(schemaString), "<subject name>")

// this fails as well
val listOfSubjects: util.List[String] = rs1.getAllSubjects(props)

The error I get is below.

Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
 at [Source: sun.net.www.protocol.http.HttpURLConnection$HttpInputStream@7507d96c; line: 1, column: 2]; error code: 50005
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
 at [Source: sun.net.www.protocol.http.HttpURLConnection$HttpInputStream@7507d96c; line: 1, column: 2]; error code: 50005

UPDATE: I've done further analysis.
The error above is not the actual error - the above error happens because because the call to jsonDeserializer.readValue() fails as an Exception object is not passed to it (refer: line in source code .

The actual error is a 401 HTTP_UNAUTHORIZED error.

The connection to Schema Registry uses Basic Auth for authorization. The REST GET call needs to have the encoded API Key:Pwd in the header.

Working code snippet in answer below.

Kevin Lawrence
  • 62
  • 1
  • 10
  • I'll try this out asap as well - https://stackoverflow.com/questions/48882723/integrating-spark-structured-streaming-with-the-confluent-schema-registry/49182004#49182004 – Kevin Lawrence Jul 31 '19 at 10:47

1 Answers1

1

I got some help from Confluent support - Schema Registry uses Basic Auth for Authorization, and the API Keys+Pwd need to in the Header as "Authorization":"Basic base64encoded(api-key-username:pwd)".

I guess I need to work on my knowledge with authorization/authentication conventions.

The working code snippets work for me.

val rs1: RestService = new RestService(s"${testKafkaSchemaRegistryURL}")
val headers = new util.HashMap[String, String]()
  headers.put("Authorization","Basic " + util.Base64.getEncoder().encodeToString(s"${testKafkaSchemaRegistryAccessKey}:${testKafkaSchemaRegistrySecretAccessKey}".getBytes()))

// works now
val allSubjects = rs1.getAllSubjects(headers)

// works now
val schConfig: Schema = rs1.getLatestVersion(headers, "kev-test-1-value")
schContent.toString().parseJson
Kevin Lawrence
  • 62
  • 1
  • 10