I am using kafka-python 2.0.1 for consuming avro data. Following is the code I have tried:
from kafka import KafkaConsumer
import avro.schema
from avro.io import DatumReader, BinaryDecoder
import io
schema_path="schema.avsc"
schema = avro.schema.parse(open(schema_path).read())
reader = DatumReader(schema)
consumer = KafkaConsumer(
bootstrap_servers='xxx.xxx.xxx.xxx:9093',
security_protocol='SASL_SSL',
sasl_mechanism = 'GSSAPI',
auto_offset_reset = 'latest',
ssl_check_hostname=False,
api_version=(1,0,0))
consumer.subscribe(['test'])
for message in consumer:
message_val = message.value
print(message_val)
bytes_reader = io.BytesIO(message_val)
bytes_reader.seek(5)
decoder = avro.io.BinaryDecoder(bytes_reader)
record = reader.read(decoder)
print(record)
I am getting following error:
avro.io.SchemaResolutionException: Can't access branch index 55 for union with 2 branches Writer's Schema: [ "null", "int" ] Reader's Schema: [ "null", "int" ]
Can anyone please suggest what can be the possible cause of this error? I already followed this thread to skip initial 5 bytes: