Hello all i have debezium which listen to changes on postgres and put events on kafka topic everything works great except i have issues decoding payloads i have tried both methods but no luck
SQL Insert Statement
INSERT INTO public.student
(id, name)
VALUES (45,'soumil 2')
Docker Compose files
version: "3.7"
services:
postgres:
image: debezium/postgres:13
ports:
- 5432:5432
environment:
- POSTGRES_USER=docker
- POSTGRES_PASSWORD=docker
- POSTGRES_DB=exampledb
zookeeper:
image: confluentinc/cp-zookeeper:5.5.3
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-enterprise-kafka:5.5.3
depends_on: [zookeeper]
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9991
ports:
- "9092:9092"
- "29092:29092"
debezium:
image: debezium/connect:1.4
environment:
BOOTSTRAP_SERVERS: kafka:29092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
depends_on: [kafka]
ports:
- 8083:8083
schema-registry:
image: confluentinc/cp-schema-registry:5.5.3
environment:
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081,http://localhost:8081
ports:
- 8081:8081
depends_on: [zookeeper, kafka]
EXEC commands
docker run --tty --network debezium_default confluentinc/cp-kafkacat kafkacat -b kafka:29092 -C -s key=s -s value=avro -r http://schema-registry:8081 -t postgres.public.student
Works fine when i exec into container
Kafka Python code
try:
import kafka
import json
import requests
import os
import sys
from json import dumps
from kafka import KafkaProducer
from kafka import KafkaConsumer
from confluent_kafka.schema_registry import SchemaRegistryClient
import io
from confluent_kafka import Consumer, KafkaError
from avro.io import DatumReader, BinaryDecoder
import avro.schema
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer import (SerializerError, # noqa
KeySerializerError,
ValueSerializerError)
print("ALL ok")
except Exception as e:
print("Error : {} ".format(e))
SCHEME_REGISTERY = "http://schema-registry:8081"
TOPIC = "postgres.public.student"
BROKER = "localhost:9092"
schema = """
{
"type":"record",
"name":"Key",
"namespace":"postgres.public.student",
"fields":[
{
"name":"id",
"type":"int"
},
{
"name":"name",
"type":"string"
}
],
"connect.name":"postgres.public.student.Key"
}
"""
schema = avro.schema.Parse(schema)
reader = DatumReader(schema)
def decode_method_1(msg_value):
message_bytes = io.BytesIO(msg_value)
decoder = BinaryDecoder(message_bytes)
event_dict = reader.read(decoder)
return event_dict
def decode_method_2(msg_value):
message_bytes = io.BytesIO(msg_value)
message_bytes.seek(5)
decoder = BinaryDecoder(message_bytes)
event_dict = reader.read(decoder)
return event_dict
def fetch_schema():
from confluent_kafka.schema_registry import SchemaRegistryClient
sr = SchemaRegistryClient({"url": 'http://localhost:8081'})
subjects = sr.get_subjects()
for subject in subjects:
schema = sr.get_latest_version(subject)
print(schema.version)
print(schema.schema_id)
print(schema.schema.schema_str)
def main():
print("Listening *****************")
consumer = KafkaConsumer(
TOPIC,
bootstrap_servers=[BROKER],
auto_offset_reset='latest',
enable_auto_commit=False,
group_id="some group"
)
for msg in consumer:
msg_value = msg.value
print("\n")
print("msg_value", msg_value)
print("decode_method_1", decode_method_1(msg_value))
print("decode_method_2", decode_method_2(msg_value))
print("\n")
main()
Outputs
msg_value b'\x00\x00\x00\x00\x02\x00\x02Z\x02\x10soumil 2\x161.4.2.Final\x14postgresql\x10postgres\xac\xa6\xe4\xbc\xa9a\x00\nfalse\x12exampledb\x0cpublic\x0estudent\x02\xf4\x07\x02\xa0\x9f\xe5\x16\x00\x02c\x02\xd4\xa8\xe4\xbc\xa9a\x00'
decode_method_1 {'id': 0, 'name': ''}
decode_method_2 {'id': 0, 'name': 'Z'}
Your help would be great as i am not able to resolve the issue here are some references