My team is using confluent_kafka to produce real time streaming and is being pushed to a queue using custom encryption. The technology behind is Java.
I am trying to consume the messages using python from the topic and able to do so but unable to decrypt it using python.
from confluent_kafka import *
import os
from confluent_kafka.serialization import StringDeserializer
os.environ['HTTPS_PROXY'] = "http://username:password@proxy.com:8080"
c = DeserializingConsumer({
'bootstrap.servers': 'kafka.abcd.com:1010',
'group.id':'SharedKafka2',
'auto.offset.reset': 'latest',
'security.protocol':'SSL',
'ssl.ca.location':'CARoot.pem',
'ssl.certificate.location':'certificate.pem',
'ssl.key.location':'key.pem',
'value.deserializer': StringDeserializer()
})
c.subscribe(['SharedKafka2_Topic'])
print("ConsumerStarted")
while True:
msg = c.poll()
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(msg.error())
break
print(msg.value())
Output = b'\x85s\n\x0e{!G\xc7\xdfaV\x8e\x03a\xc4\x8e\xe1\xbc\xf4\xf4F\xb5<{\xecD\xc2\xf8jNN
I am unable to provide the custom encryption key in value deserialiser. Below is how it is done in java:
props.put("key.deserializer","com.aexp.sec.kafka.serializers.CryptoDeSerializer");
props.put("value.deserializer","com.aexp.sec.kafka.serializers.CryptoDeSerializer");
props.put("CRYPTO_KEY_DESERIALIZER", StringDeserializer.class.getName());
props.put("CRYPTO_VALUE_DESERIALIZER", StringDeserializer.class.getName());
**props.put(String.format("%s_APIGEE_ENDPOINT", topic), apigeeUrl);
props.put("ENVIRONMENT", environment);
props.put(String.format("%s_KEY_IDENTIFIER", topic), keyIdentifier);
props.put(String.format("%s_APIGEE_CLIENT_ID", topic), clientID);
props.put(String.format("%s_APIGEE_CLIENT_SECRET", topic), clientSecret);**
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
buff.write(record.value()+"\n");
buff.flush();
}
}
How to decrypt string format using custom decryption using python?