I am able to implement entire pipeline where I insert some sample documents into postgres tables and i am able to see those CDC items into my kafka topic when i exec into container but when running python locally cant get any messages
Docker Compose File
version: "3.7"
services:
postgres:
image: debezium/postgres:13
ports:
- 5432:5432
environment:
- POSTGRES_USER=docker
- POSTGRES_PASSWORD=docker
- POSTGRES_DB=exampledb
zookeeper:
image: confluentinc/cp-zookeeper:5.5.3
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-enterprise-kafka:5.5.3
depends_on: [zookeeper]
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9991
ports:
- 9092:9092
- 29092:29092
debezium:
image: debezium/connect:1.4
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
depends_on: [kafka]
ports:
- 8083:8083
schema-registry:
image: confluentinc/cp-schema-registry:5.5.3
environment:
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081,http://localhost:8081
ports:
- 8081:8081
depends_on: [zookeeper, kafka]
Debezium Settings
{
"name": "exampledb-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.user": "docker",
"database.dbname": "exampledb",
"database.hostname": "postgres",
"database.password": "docker",
"name": "exampledb-connector",
"database.server.name": "postgres",
"table.include.list": "public.student",
"plugin.name": "pgoutput",
"database.port": "5432"
},
"tasks": [
{
"connector": "exampledb-connector",
"task": 0
}
],
"type": "source"
}
CURL
curl --location --request GET 'http://localhost:8083/connectors/exampledb-connector'
SQL Commands
CREATE TABLE student
(
id integer primary key,
name varchar
)
ALTER TABLE public.student REPLICA IDENTITY FULL
INSERT INTO public.student
(id, name)
VALUES (1,'test2')
INSERT INTO public.student
(id, name)
VALUES (2,'test2')
Test Docker EXEC
docker network ls
docker run --tty --network debezium_default confluentinc/cp-kafkacat kafkacat -b kafka:9092 -C -s key=s -s value=avro -r http://schema-registry:8081 -t postgres.public.student
Python code
try:
import kafka
import json
import requests
import os
import sys
from json import dumps
from kafka import KafkaProducer
from kafka import KafkaConsumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from kafka import KafkaConsumer
import json
import requests
import os
import sys
except Exception as e:
pass
SCHEME_REGISTERY = "http://schema-registry:8081"
TOPIC = "postgres.public.student"
BROKER = "localhost:9092"
import kafka
consumer = kafka.KafkaConsumer(group_id='1', bootstrap_servers=[BROKER])
print(consumer.topics())
print("***************")
def main():
print("Listening *****************")
consumer = KafkaConsumer(
TOPIC,
bootstrap_servers=[BROKER],
auto_offset_reset='earliest',
enable_auto_commit=False,
group_id='1'
)
for msg in consumer:
payload = json.loads(msg.value)
payload["meta_data"]={
"topic":msg.topic,
"partition":msg.partition,
"offset":msg.offset,
"timestamp":msg.timestamp,
"timestamp_type":msg.timestamp_type,
"key":msg.key,
}
print(payload, end="\n")
main()
i have tried reading some blogs and issue but did not really help i am hoping someone whos expert in domain can guide me and point out my mistakes