0

I have used the schema registry of confluent. I have not uploaded any schema of bytes under any subject.

Still, I get a schema under id 2, at the endpoint http://schema-registry-host:port/schemas/ids/2.

Could anyone explain this?

I made a MQTT source connector i.e MQTT-Kafka using Avro schema to get the Avro data on the Kafka topic.

I supplied the schema registry ID in my spark ETL function to consume AVRO data.

The function picks default schema bytes at id 2.

I gave the id where the test schema was present.

Caused by: org.apache.avro.AvroTypeException: ** Found bytes, expecting test **

    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:240) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) ~[avro-1.11.0.jar:1.11.0]

I want to parse the AVRO data from the MQTT source connector to the KAFKA topic using the AVRO spark function.

Let me know if I am missing anything.

Thanks.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245

1 Answers1

1

ID 2 isn't any default.

If you used a source connector, with Confluent AvroConverter, then that will auto register a schema.

The MQTT connector will only use bytes or string schemas.

Spark from_avro doesn't have any integration with any Schema Registry. Related post - Integrating Spark Structured Streaming with the Confluent Schema Registry , but since the data is only bytes, with no internal schema, then using top-level Avro functions wouldn't have any benefit.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • I have passed auto.register.schemas=false, why is it auto registering the schema then. I have given the schema registry url and schema id, why is it not using that, and please give me instructions on how to bind my AVRO schema with MQTT connector i.e kafka topic passed in connector config. – Prabhat Sharma Jan 19 '23 at 05:42
  • 1) You need `value.converter.auto.register.schemas=false`. 2) As mentioned, MQTT connector does not send "proper" Avro payloads. If you configure it to do so, then it will only register `{"schema": "bytes/string"}`, which is almost never what you want. – OneCricketeer Jan 19 '23 at 19:09