0

So, I want to read a topic from kafka (Confluent) where data lies in Avro format.

For certain unavoidable reasons , I would like to disable certificate validation.

I am using security.protocol= SASL_SSL and sasl.mechanisms= OAUTHBEARER

I can connect to Kafka by disabling the ssl certificate validation

'enable.ssl.certificate.verification': 'false'

Now, I am running into a problem when trying to Deserialize the value, using Schema registry. The avro deseralizer requires the schema registry client and the schema (optional). I am passing in both. I got the value of the 2nd parameter by making a separate requests using the verify=False to get the schema, which works fine. But problem arises when I try to create a Deserializing consumer out of it.

Basically the skeleton of the code (along with comments of where the problem is)

topic="mytopic"
registry_configuration="schema registry url"
url = urljoin(registry_configuration, f'/subjects/{topic}-value/versions/latest')


schema_registry_response = requests.get(url, verify=False) #here I intentionally used verify=False to get the schema
schema_registry_response.raise_for_status()
        
consumption_schema=schema_registry_response.json()['schema']
print(consumption_schema) # This works fine

schema_registry_client = SchemaRegistryClient({'url': registry_configuration})
avro_deserializer = AvroDeserializer(schema_registry_client,consumption_schema
                                             ) # This is the problem area which is called when the poll method is called


string_deserializer = StringDeserializer('utf_8')

basic_conf=_get_basic_configuration()
consumer_conf = {'key.deserializer': string_deserializer,
                 'value.deserializer': avro_deserializer,
                 'group.id': 'myconsumergroupid',
                 'auto.offset.reset': 'earliest'}
consumer_conf.update(basic_conf)

        # update default config with parameter
additional_consumer_conf={}
consumer_conf.update(additional_consumer_conf)
cn=DeserializingConsumer(consumer_conf)    
cn.subscribe(['topicname'])

while True:
  msg=cn.poll(10) # This fails because the deserializingconsumer calls the schema registry and the certificate validation fails

The get_basic_configuration method that you see above has

'enable.ssl.certificate.verification': 'false'

The error is

Max retries exceeded with url: /schemas/ids/140 (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self signed certificate in certificate chain

I have gone through the code of SchemaRegistryClient here but I do not see any option to set the certificate validation to false.

I have also searched through S.O posts and other Confluent documentation to see if I could find something, but nothing helpful

Hope someone knows more here? I am willing to elaborate or clarify any questions? I am trying not to use a lot of custom logic to deserialize if possible.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Saugat Mukherjee
  • 778
  • 8
  • 32

1 Answers1

1

I have found an answer to this.

Basically this S.O post here. Especially the answer after the accepted answer , if you are using Confluent kafka

and avro documentation here , because my schema wasn't coming from a file , but as a http response, so I had to parse it using avro.schema.parse

Final skeleton code

topic="mytopic"
registry_configuration="schema registry url"
url = urljoin(registry_configuration, f'/subjects/{topic}-value/versions/latest')


schema_registry_response = requests.get(url, verify=False)
schema_registry_response.raise_for_status()
        
consumption_schema=schema_registry_response.json()['schema']
consumption_schema = avro.schema.parse(consumption_schema)

schema_registry_client = SchemaRegistryClient({'url': registry_configuration})

basic_conf=_get_basic_configuration()
consumer_conf = {
                 'group.id': 'myconsumergroupid',
                 'auto.offset.reset': 'earliest'}
consumer_conf.update(basic_conf)


cn=Consumer(consumer_conf)    
cn.subscribe(['mytopic'])
reader = DatumReader(consumption_schema)

while True:
  msg=cn.poll(10)
  if msg is None:
    break
  m=msg.value()
  message_bytes = io.BytesIO(m)
  
  message_bytes.seek(5)
  decoder = BinaryDecoder(message_bytes)
  event_dict = reader.read(decoder) 
  print(event_dict)
  
Saugat Mukherjee
  • 778
  • 8
  • 32