1

I am able to implement entire pipeline where I insert some sample documents into postgres tables and i am able to see those CDC items into my kafka topic when i exec into container but when running python locally cant get any messages

Docker Compose File

version: "3.7"
services:
  postgres:
    image: debezium/postgres:13
    ports:
      - 5432:5432
    environment:
      - POSTGRES_USER=docker
      - POSTGRES_PASSWORD=docker
      - POSTGRES_DB=exampledb

  zookeeper:
    image: confluentinc/cp-zookeeper:5.5.3
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-enterprise-kafka:5.5.3
    depends_on: [zookeeper]
    environment:
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9991
    ports:
      - 9092:9092
      - 29092:29092

  debezium:
    image: debezium/connect:1.4
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets

      KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
    depends_on: [kafka]
    ports:
      - 8083:8083

  schema-registry:
    image: confluentinc/cp-schema-registry:5.5.3
    environment:
      - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
      - SCHEMA_REGISTRY_HOST_NAME=schema-registry
      - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081,http://localhost:8081
    ports:
      - 8081:8081
    depends_on: [zookeeper, kafka]

Debezium Settings

{
    "name": "exampledb-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.user": "docker",
        "database.dbname": "exampledb",
        "database.hostname": "postgres",
        "database.password": "docker",
        "name": "exampledb-connector",
        "database.server.name": "postgres",
        "table.include.list": "public.student",
        "plugin.name": "pgoutput",
        "database.port": "5432"
    },
    "tasks": [
        {
            "connector": "exampledb-connector",
            "task": 0
        }
    ],
    "type": "source"
}

CURL

curl --location --request GET 'http://localhost:8083/connectors/exampledb-connector'

SQL Commands

CREATE TABLE student
(
    id integer primary key,
    name varchar
)

ALTER TABLE public.student REPLICA IDENTITY  FULL

INSERT INTO public.student
    (id, name)
VALUES (1,'test2')

INSERT INTO public.student
    (id, name)
VALUES (2,'test2')

Test Docker EXEC


docker network ls 



docker run --tty --network debezium_default  confluentinc/cp-kafkacat kafkacat -b kafka:9092 -C -s key=s -s value=avro -r http://schema-registry:8081 -t postgres.public.student



enter image description here

Python code



try:
    import kafka

    import json
    import requests
    import os
    import sys
    from json import dumps
    from kafka import KafkaProducer

    from kafka import KafkaConsumer
    from confluent_kafka.schema_registry import SchemaRegistryClient
    from kafka import KafkaConsumer
    import json
    import requests
    import os
    import sys
except Exception as e:
    pass


SCHEME_REGISTERY = "http://schema-registry:8081"
TOPIC = "postgres.public.student"
BROKER = "localhost:9092"

import kafka
consumer = kafka.KafkaConsumer(group_id='1', bootstrap_servers=[BROKER])
print(consumer.topics())
print("***************")


def main():
    print("Listening *****************")

    consumer = KafkaConsumer(
        TOPIC,
        bootstrap_servers=[BROKER],
        auto_offset_reset='earliest',
        enable_auto_commit=False,
        group_id='1'
    )

    for msg in consumer:

        payload = json.loads(msg.value)
        payload["meta_data"]={
            "topic":msg.topic,
            "partition":msg.partition,
            "offset":msg.offset,
            "timestamp":msg.timestamp,
            "timestamp_type":msg.timestamp_type,
            "key":msg.key,
        }
        print(payload, end="\n")


main()

i have tried reading some blogs and issue but did not really help i am hoping someone whos expert in domain can guide me and point out my mistakes

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Soumil Nitin Shah
  • 634
  • 2
  • 7
  • 18

1 Answers1

0

switching ports worked

version: "3.7"
services:
  postgres:
    image: debezium/postgres:13
    ports:
      - 5432:5432
    environment:
      - POSTGRES_USER=docker
      - POSTGRES_PASSWORD=docker
      - POSTGRES_DB=exampledb

  zookeeper:
    image: confluentinc/cp-zookeeper:5.5.3
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-enterprise-kafka:5.5.3
    depends_on: [zookeeper]
    environment:
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9991
    ports:
      - "9092:9092"
      - "29092:29092"

  debezium:
    image: debezium/connect:1.4
    environment:
      BOOTSTRAP_SERVERS: kafka:29092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
    depends_on: [kafka]
    ports:
      - 8083:8083

  schema-registry:
    image: confluentinc/cp-schema-registry:5.5.3
    environment:
      - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
      - SCHEMA_REGISTRY_HOST_NAME=schema-registry
      - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081,http://localhost:8081
    ports:
      - 8081:8081
    depends_on: [zookeeper, kafka]
Soumil Nitin Shah
  • 634
  • 2
  • 7
  • 18