3

We are trying to consume AVRO messages coming from other systems. I am able to read the AVRO message when I specify the schema as a file (.avsc) using the below code,

import avro.schema
from avro.io import DatumReader, BinaryDecoder
...
schema = avro.schema.Parse(open("schema.avsc", "rb").read())
...
bytes_reader = io.BytesIO(element) # element is the serialized message
decoder = BinaryDecoder(bytes_reader)
reader = DatumReader(schema)
rec = reader.read(decoder)

However, I now need to read the schema from a schema registry URL instead,

http://<IP>:<PORT>/subjects/<SUBJECT>/versions/<VERSION>/schema

I am extracting the url from the incoming message custom attribute 'schema'. Now to fetch the schema from the url I am using the below code,

def fetch_schema(IP, subject, version):
    sr = SchemaRegistryClient(IP)
    schema = sr.get_schema(subject, version=version).schema
    return schema

With the same code used above to deserialize the message I now get the below error

AttributeError: 'AvroSchema' object has no attribute 'type' 

on the line,

rec = reader.read(decoder) 

I have compared the type of the 'schema' variable when I read from file vs. when I fetch from the URL,

from file, the schema type is : <class 'avro.schema.RecordSchema'>
from URL, the schema type is : <class 'schema_registry.client.schema.AvroSchema'>

They are different and hence probably the issue. Looking for some direction here. Thanks!

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Prasad Sawant
  • 205
  • 1
  • 15

2 Answers2

0

It appears that you need to get the JSON representation from the Schema Registry API call, then you can use avro.schema.Parse as before.

That being said, you could just use urllib or requests, and you don't need the SchemaRegistryClient

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
0

Today I had the same issue when converting from avro.schema.RecordSchema to schema_registry.client.schema.AvroSchema. One possible solution could be dumping to JSON and then parsing it with the Avro library.

import avro.schema
from schema_registry.client import SchemaRegistryClient

client = SchemaRegistryClient(url="localhost:8081")
test_table_schema = client.get_schema("table_0_schema").schema

avro_schema = avro.schema.parse(json.dumps(test_table_schema.schema.raw_schema))

reader = DatumReader(avro_schema)

Caveat:

When working with Kafka and Avro there are also a couple of other issues that could happen:

  • flush the topics of non-avro messages. I lost a lot of time decoding from Avro messages that were pushed with a json format because the Kafka consumer had the auto_offset_reset='earliest' setting.
  • when working with the Confluent version of Debezium, there might be 5 bytes dedicated to the schema id. Your decoding function should look like this:
def decode(msg_value):
    message_bytes = io.BytesIO(msg_value)
    message_bytes.seek(5) # <-----
    decoder = BinaryDecoder(message_bytes)
    event_dict = reader.read(decoder)
    return event_dict

Check this answer for more info.

Luca Soato
  • 365
  • 3
  • 12