We are trying to consume AVRO messages coming from other systems. I am able to read the AVRO message when I specify the schema as a file (.avsc) using the below code,
import avro.schema
from avro.io import DatumReader, BinaryDecoder
...
schema = avro.schema.Parse(open("schema.avsc", "rb").read())
...
bytes_reader = io.BytesIO(element) # element is the serialized message
decoder = BinaryDecoder(bytes_reader)
reader = DatumReader(schema)
rec = reader.read(decoder)
However, I now need to read the schema from a schema registry URL instead,
http://<IP>:<PORT>/subjects/<SUBJECT>/versions/<VERSION>/schema
I am extracting the url from the incoming message custom attribute 'schema'. Now to fetch the schema from the url I am using the below code,
def fetch_schema(IP, subject, version):
sr = SchemaRegistryClient(IP)
schema = sr.get_schema(subject, version=version).schema
return schema
With the same code used above to deserialize the message I now get the below error
AttributeError: 'AvroSchema' object has no attribute 'type'
on the line,
rec = reader.read(decoder)
I have compared the type of the 'schema' variable when I read from file vs. when I fetch from the URL,
from file, the schema type is : <class 'avro.schema.RecordSchema'>
from URL, the schema type is : <class 'schema_registry.client.schema.AvroSchema'>
They are different and hence probably the issue. Looking for some direction here. Thanks!