2

My team is using confluent_kafka to produce real time streaming and is being pushed to a queue using custom encryption. The technology behind is Java.

I am trying to consume the messages using python from the topic and able to do so but unable to decrypt it using python.

from confluent_kafka import *
import os
from confluent_kafka.serialization import StringDeserializer

os.environ['HTTPS_PROXY'] = "http://username:password@proxy.com:8080"


c = DeserializingConsumer({
    'bootstrap.servers': 'kafka.abcd.com:1010',
    'group.id':'SharedKafka2',
    'auto.offset.reset': 'latest',
    'security.protocol':'SSL',
    'ssl.ca.location':'CARoot.pem',
    'ssl.certificate.location':'certificate.pem',
    'ssl.key.location':'key.pem',
    'value.deserializer': StringDeserializer()
})

c.subscribe(['SharedKafka2_Topic'])


print("ConsumerStarted")
while True:
    msg = c.poll()
    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            continue
        else:
            print(msg.error())
            break

    print(msg.value())

Output = b'\x85s\n\x0e{!G\xc7\xdfaV\x8e\x03a\xc4\x8e\xe1\xbc\xf4\xf4F\xb5<{\xecD\xc2\xf8jNN

I am unable to provide the custom encryption key in value deserialiser. Below is how it is done in java:

props.put("key.deserializer","com.aexp.sec.kafka.serializers.CryptoDeSerializer");
props.put("value.deserializer","com.aexp.sec.kafka.serializers.CryptoDeSerializer");
props.put("CRYPTO_KEY_DESERIALIZER", StringDeserializer.class.getName());
props.put("CRYPTO_VALUE_DESERIALIZER", StringDeserializer.class.getName());
**props.put(String.format("%s_APIGEE_ENDPOINT", topic), apigeeUrl);
props.put("ENVIRONMENT", environment);
props.put(String.format("%s_KEY_IDENTIFIER", topic), keyIdentifier);
props.put(String.format("%s_APIGEE_CLIENT_ID", topic), clientID);
props.put(String.format("%s_APIGEE_CLIENT_SECRET", topic), clientSecret);**

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                buff.write(record.value()+"\n");
                buff.flush();
            }
        }

How to decrypt string format using custom decryption using python?

James Z
  • 12,209
  • 10
  • 24
  • 44

1 Answers1

1

This is not a Confluent Python limitation. The Java program relies on this custom deserializer called com.aexp.sec.kafka.serializers.CryptoDeSerializer and thus there is no much anyone could help here. What you would need to do is come up with a equal implementation in Python that does the same logic implemented in the custom deserializer and then register it while creating your consumer:

c = DeserializingConsumer({
'bootstrap.servers': 'kafka.abcd.com:1010',
'group.id':'SharedKafka2',
'auto.offset.reset': 'latest',
'security.protocol':'SSL',
'ssl.ca.location':'CARoot.pem',
'ssl.certificate.location':'certificate.pem',
'ssl.key.location':'key.pem',
'value.deserializer': YourCustomDeserializerInPython()

})

If possible, try to reach out to whoever implemented the CryptoDeSerializer class in Java to understand how your code in Python needs to look like.

Ricardo Ferreira
  • 1,236
  • 3
  • 6
  • Sure Ricardo. I will try to implement that and will let you know. Thank for your response. :) – Mayank Ahuja Jul 27 '20 at 05:51
  • 1
    Seems like it is impossible to write decryption code in python. com.aexp.sec.kafka.serializers.CryptoDeSerializer is a jar used across organisation. Is there a possibility we can use this Jar in python? – Mayank Ahuja Jul 27 '20 at 06:45
  • 1
    It is, as you can read more about it here: https://stackoverflow.com/questions/3652554/calling-java-from-python – Ricardo Ferreira Jul 27 '20 at 15:52