2

I’m using a Kafka Source in Spark Streaming to receive records generated using Datagen in Confluent Cloud. I intend to use Confluent Schema Registry,

Currently, this is the exception I am facing : *

Exception in thread “main” io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401

the schema registry of confluent cloud requires to pass some authentication data that I don’t know how to enter them:

basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=secret: secret

I think I have to pass this authentication data to CachedSchemaRegistryClient but I’m not sure if so and how.

// Setup the Avro deserialization UDF
   schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
    kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)
    spark.udf.register("deserialize", (bytes: Array[Byte]) =>
      kafkaAvroDeserializer.deserialize(bytes)

If I am trying to send authentication to schema registry as

val restService = new RestService(schemaRegistryURL)

  val props = Map(
    "basic.auth.credentials.source" -> "USER_INFO",
    "schema.registry.basic.auth.user.info" -> "secret:secret"
  ).asJava

  var schemaRegistryClient = new CachedSchemaRegistryClient(restService, 100, props)

I get Cannot resolve overloaded constructor CachedSchemaRegistryClient, seems that only 2 parameters are to be sent to CachedSchemaRegistryClient.

HOW DO I FIX THIS?

I came across this post but here they haven't applied any authentication to schema registry in confluent cloud.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • https://stackoverflow.com/questions/58835635/how-pass-basic-authentication-to-confluent-schema-registry – Ran Lupovich Jun 18 '21 at 14:02
  • Not clear what version of the Registry client youre using... You definitely can pass three parameters - https://github.com/confluentinc/schema-registry/blob/master/client/src/main/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClient.java#L106 – OneCricketeer Jun 19 '21 at 16:45
  • @OneCricketeer this is what am using currently kafka-schema-registry-client-3.3.1.jar..any suggestions which version should be used in this case? – Aditi Sinha Jun 19 '21 at 18:19
  • Why not use the latest version? – OneCricketeer Jun 19 '21 at 22:26
  • @OneCricketeer Yes I have also tried other versions including the latest "kafka-schema-registry" % "6.2.0" but still the same error : Cannot resolve overloaded constructorCachedSchemaRegistryClient – Aditi Sinha Jun 20 '21 at 04:52
  • You shouldn't need the `RestService`, and I think the scala Int class is conflicting with Java `int` primitive, so try `(schemaRegistryURL, new java.lang.Integer(100), props)` – OneCricketeer Jun 20 '21 at 16:54
  • @OneCricketeer Same error : Cannot resolve overloaded constructor CachedSchemaRegistryClient !!! It looks fine when I just pass 2 parameters like just `schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl,100)` but its not working when I am trying to pass props – Aditi Sinha Jun 21 '21 at 03:20
  • I don't really use Scala, but I suspect that's the problem. If you make a `new java.util.Map` rather than use `.asJava`, maybe you'll get something else. Otherwise, I'd suggest writing your Spark code with Java or maybe even Kotlin – OneCricketeer Jun 21 '21 at 20:19
  • 1
    @OneCricketeer Thanks for your inputs. I am now able to connect to my schema registry in confluent. Using spark readStream I can read from the kafka topic and see the schema. But while doing from_avro(value) and trying writeStream on console I get `Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -1` ..?? Any inputs or references would be helpful. – Aditi Sinha Jun 23 '21 at 10:50
  • I checked this [https://stackoverflow.com/questions/48882723/integrating-spark-structured-streaming-with-the-confluent-schema-registry] but when I am trying to run it gives me: `Caused by: org.apache.spark.SparkException: Failed to execute user defined function(App$$$Lambda$2497/0x0000000101164840: (binary) => string)` – Aditi Sinha Jun 23 '21 at 10:51
  • Feel free to answer below with the code that fixed the issue. For the next problem, as I wrote there, I did not test that code with a distributed Spark cluster, so there indeed might be issues with serializing the UDF, and you should refer instead to the linked blog post and Databricks notebooks (although, I personally haven't tested them) – OneCricketeer Jun 23 '21 at 15:34
  • 1
    @OneCricketeer This piece of code worked for me: `private val schemaRegistryUrl = "" val props = Map( "basic.auth.credentials.source" -> "USER_INFO", "schema.registry.basic.auth.user.info" -> ":" ).asJava private val schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 100,props)` We need to make sure we are doing a correct import while converting to JAVA: `import scala.collection.JavaConverters.mapAsJavaMapConverter` – Aditi Sinha Jun 24 '21 at 13:24
  • Please move your comment to an answer below – OneCricketeer Jun 24 '21 at 17:03

1 Answers1

1

This piece of code worked for me:

private val schemaRegistryUrl = "<schemaregistryURL>"   
val props = Map("basic.auth.credentials.source" -> "USER_INFO",
 "schema.registry.basic.auth.user.info" -> "<api-key>:<api-secret>").asJava

 private val schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 100,props)

We need to make sure we are doing a correct import while converting to JAVA:

 import scala.collection.JavaConverters.mapAsJavaMapConverter