I'm trying to receive the message from the topic using Python script.
from confluent_kafka import Consumer, KafkaError
import uuid
# Kafka broker details
broker = "something"
topic = "something"
group = str(uuid.uuid4())
# Kafka consumer configuration
conf = {
"bootstrap.servers": broker,
"group.id": group,
"auto.offset.reset": "earliest"
}
consumer = Consumer(conf)
# Subscribe to the topic
consumer.subscribe([topic])
try:
while True:
msg = consumer.poll(1.0) # Poll for new messages
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
print("Reached end of partition")
else:
print("Error: {}".format(msg.error().str()))
else:
# Decode the received message value as UTF-8
received_message = msg.value().decode("utf-8", errors="ignore")
# Print the received message
print("Received message:", received_message)
except KeyboardInterrupt:
pass
finally:
consumer.close()
This is my output:
Received message: H2ff4c4d5-74ee-4708-b553-aa68ea5e675bHdfb821d1-db91-44b8-a080-8f689a80bb32H2ff4c4d5-74ee-4708-b553-aa68ea5e675bCREATEIntegrationCATALOGNodeܥ[{"id":"2ff4c4d5-74ee-4708-b553-aa68ea5e675b","name":"Integration","classType":"EntityClass","graphType":"Node","objectType":"Integration","enabled":true,"tenantDefault":true.....
this is the kafka message when I download it(see how there are headers instead of H):
"{"id":"100dea7f-bc9d-4775-b660-17b99fd1439a","tenant_id":"d257587b-d53f-498a-9256-4d1851b847d8","callback_id":{"string":"100dea7f-bc9d-4775-b660-17b99fd1439a"},"operation":{"string":"CREATE"},"type":{"string":"Genericentity"},"base_type":{"string":"CATALOG"},"graph_type":{"string":"Node"},"json_payload":{"string":"[{\"id\":\"100dea7f-bc9d-4775-b660-17b99fd1439a\",\"name\":\"LSH - Safety Stock\",\"classType\":\"EntityClass\",\"graphType\":\"Node\",\"objectType\":\"LSHSafetyStock\",\"enabled\":true,\"tenantDefault\":false,\"catalogType\":\"genericentity\",\"type\":\"Genericentity\",\"cdmFields\":[{\"id\":\"2e1f4381-cf61-49a5-934b-6b47a93d1771\",\"name\":\"baseTypeKey\",\"displayLabel\":\"Base Type Key\",\"type\":\"STRING\",\"required\":false,\"size\":20,\"unit\":\"Default\",\"value\":\"\",\"regex\":null,\"unique\":false,\"defaultValue\":null,\"group\":\"System\",\"autoGenerated\":false,\"format\":\"\",\"tooltip\":\"\",\"searchable\":false,\"filterable\":false,\"editable\":false,\"entityName\":null,\"entityKey\":null,\"entityValue\":null,\"displayable\":false,\"instanceUserEditable\":false,\"instanceUserCreatable\":false,\"apiFlag\":false,\"apiUrl\":\"\",\"apiResponseSelectKey\":\"\",\"apiResponseSelectValue\":\"\",\"order\":-1,\"tenantDefault\":false,\"isDisplayableOnSummary\":false,\"isDisplayableOnDetails\":false,\"isDisplayableOnCatalog\":false,\"isDisplayableOnList\":false,\"dataClassification\":\"DEFAULT\"},{\"id\":\"cf7a08cb-5f7f-472b-a9eb-516ca864a188\",\"name\":\"entityType\",\"displayLabel\":\"Entity Type\",\"type\":\"STRING\",\"required\":false,\"size\":20,\"unit\":\"Default\",\"value\":\"\",\"regex\":null,\"unique\":false,\"defaultValue\":null,\"group\":\"System\",\"autoGenerated\":false,\"format\":\"\",\"tooltip\":\"\",\"searchable\":false,\"filterable\":false,\"editable\":false,\"entityName\":null,\"entityKey\":null,\"entityValue\":null,\"displayable\":false,\"instanceUserE