1

I'm trying to receive the message from the topic using Python script.

from confluent_kafka import Consumer, KafkaError
import uuid

# Kafka broker details
broker = "something"
topic = "something"
group = str(uuid.uuid4())

# Kafka consumer configuration
conf = {
    "bootstrap.servers": broker,
    "group.id": group,
    "auto.offset.reset": "earliest"  
}

consumer = Consumer(conf)

# Subscribe to the topic
consumer.subscribe([topic])

try:
    while True:
        msg = consumer.poll(1.0)  # Poll for new messages
        
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                print("Reached end of partition")
            else:
                print("Error: {}".format(msg.error().str()))
        else:
            # Decode the received message value as UTF-8
            received_message = msg.value().decode("utf-8", errors="ignore")

            # Print the received message
            print("Received message:", received_message)
except KeyboardInterrupt:
    pass
finally:
    consumer.close()

This is my output:

Received message: H2ff4c4d5-74ee-4708-b553-aa68ea5e675bHdfb821d1-db91-44b8-a080-8f689a80bb32H2ff4c4d5-74ee-4708-b553-aa68ea5e675bCREATEIntegrationCATALOGNodeܥ[{"id":"2ff4c4d5-74ee-4708-b553-aa68ea5e675b","name":"Integration","classType":"EntityClass","graphType":"Node","objectType":"Integration","enabled":true,"tenantDefault":true.....

this is the kafka message when I download it(see how there are headers instead of H):

"{"id":"100dea7f-bc9d-4775-b660-17b99fd1439a","tenant_id":"d257587b-d53f-498a-9256-4d1851b847d8","callback_id":{"string":"100dea7f-bc9d-4775-b660-17b99fd1439a"},"operation":{"string":"CREATE"},"type":{"string":"Genericentity"},"base_type":{"string":"CATALOG"},"graph_type":{"string":"Node"},"json_payload":{"string":"[{\"id\":\"100dea7f-bc9d-4775-b660-17b99fd1439a\",\"name\":\"LSH - Safety Stock\",\"classType\":\"EntityClass\",\"graphType\":\"Node\",\"objectType\":\"LSHSafetyStock\",\"enabled\":true,\"tenantDefault\":false,\"catalogType\":\"genericentity\",\"type\":\"Genericentity\",\"cdmFields\":[{\"id\":\"2e1f4381-cf61-49a5-934b-6b47a93d1771\",\"name\":\"baseTypeKey\",\"displayLabel\":\"Base Type Key\",\"type\":\"STRING\",\"required\":false,\"size\":20,\"unit\":\"Default\",\"value\":\"\",\"regex\":null,\"unique\":false,\"defaultValue\":null,\"group\":\"System\",\"autoGenerated\":false,\"format\":\"\",\"tooltip\":\"\",\"searchable\":false,\"filterable\":false,\"editable\":false,\"entityName\":null,\"entityKey\":null,\"entityValue\":null,\"displayable\":false,\"instanceUserEditable\":false,\"instanceUserCreatable\":false,\"apiFlag\":false,\"apiUrl\":\"\",\"apiResponseSelectKey\":\"\",\"apiResponseSelectValue\":\"\",\"order\":-1,\"tenantDefault\":false,\"isDisplayableOnSummary\":false,\"isDisplayableOnDetails\":false,\"isDisplayableOnCatalog\":false,\"isDisplayableOnList\":false,\"dataClassification\":\"DEFAULT\"},{\"id\":\"cf7a08cb-5f7f-472b-a9eb-516ca864a188\",\"name\":\"entityType\",\"displayLabel\":\"Entity Type\",\"type\":\"STRING\",\"required\":false,\"size\":20,\"unit\":\"Default\",\"value\":\"\",\"regex\":null,\"unique\":false,\"defaultValue\":null,\"group\":\"System\",\"autoGenerated\":false,\"format\":\"\",\"tooltip\":\"\",\"searchable\":false,\"filterable\":false,\"editable\":false,\"entityName\":null,\"entityKey\":null,\"entityValue\":null,\"displayable\":false,\"instanceUserE

Aleksandar
  • 84
  • 1
  • 4

1 Answers1

1

It looks like the data on the topic is written in a different format than you expect. Are you using a schema registry? Perhaps it's written in Confluent Wire Format. If Kafka UI is set up correctly it automatically decodes the message using the schema from the schema registry.

Which value serde is selected in the Kafka UI? If it's something like 'Schema Registry' then here are some resources on how to integrate the schema registry with a python client https://developer.confluent.io/courses/kafka-python/integrate-with-schema-registry/