11

I have a running debezium cluster in AWS, no issues with that. I want to give a try with AWS MSK. So I launched a cluster. Then I launched an EC2 for running my connectors.

Then installed confluent-kafka

sudo apt-get update && sudo apt-get install confluent-platform-2.12

By default the AWS MSK doesn't have schema registry, So I configured it from the connector EC2 Schema registry conf file:

kafkastore.connection.url=z-1.bhuvi-XXXXXXXXX.amazonaws.com:2181,z-3.bhuvi-XXXXXXXXX.amazonaws.com:2181,z-2.bhuvi-XXXXXXXXX.amazonaws.com:2181


kafkastore.bootstrap.servers=PLAINTEXT://b-2.bhuvi-XXXXXXXXX.amazonaws.com:9092,PLAINTEXT://b-4.bhuvi-XXXXXXXXX.amazonaws.com:9092,PLAINTEXT://b-1.bhuvi-XXXXXXXXX.amazonaws.com:9092

Then /etc/kafka/connect-distributed.properties file

bootstrap.servers=b-4.bhuvi-XXXXXXXXX.amazonaws.com:9092,b-3.bhuvi-XXXXXXXXX.amazonaws.com:9092,b-2.bhuvi-XXXXXXXXX.amazonaws.com:9092

plugin.path=/usr/share/java,/usr/share/confluent-hub-components

Install connector:

confluent-hub install debezium/debezium-connector-mysql:latest

start the service

systemctl start confluent-schema-registry
systemctl start confluent-connect-distributed

Now everything started. Then I created a mysql.json file.

{
    "name": "mysql-connector-db01",
    "config": {
        "name": "mysql-connector-db01",
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.server.id": "1",
        "tasks.max": "3",
        "database.history.kafka.bootstrap.servers": "172.31.47.152:9092,172.31.38.158:9092,172.31.46.207:9092",
        "database.history.kafka.topic": "schema-changes.mysql",
        "database.server.name": "mysql-db01",
        "database.hostname": "172.31.84.129",
        "database.port": "3306",
        "database.user": "bhuvi",
        "database.password": "my_stong_password",
        "database.whitelist": "proddb,test",
        "internal.key.converter.schemas.enable": "false",
        "key.converter.schemas.enable": "false",
        "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "internal.value.converter.schemas.enable": "false",
        "value.converter.schemas.enable": "false",
        "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
        "transforms.unwrap.add.source.fields": "ts_ms",
    }
}

Create debezium connector

curl -X POST -H "Accept: application/json" -H "Content-Type: application/json" http://localhost:8083/connectors -d @mysql.josn

Then its stated giving this error in the connector EC2.


Dec 20 11:42:36 ip-172-31-44-220 connect-distributed[2630]: [2019-12-20 11:42:36,290] WARN [Producer clientId=producer-3] Got error produce response with correlation id 844 on topic-partition connect-configs-0, retrying (2147482809 attempts left). Error: NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender:637)
Dec 20 11:42:36 ip-172-31-44-220 connect-distributed[2630]: [2019-12-20 11:42:36,391] WARN [Producer clientId=producer-3] Got error produce response with correlation id 845 on topic-partition connect-configs-0, retrying (2147482808 attempts left). Error: NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender:637)
Dec 20 11:42:36 ip-172-31-44-220 connect-distributed[2630]: [2019-12-20 11:42:36,492] WARN [Producer clientId=producer-3] Got error produce response with correlation id 846 on topic-partition connect-configs-0, retrying (2147482807 attempts left). Error: NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender:637)
Dec 20 11:42:36 ip-172-31-44-220 connect-distributed[2630]: [2019-12-20 11:42:36,593] WARN [Producer clientId=producer-3] Got error produce response with correlation id 847 on topic-partition connect-configs-0, retrying (2147482806 attempts left). Error: NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender:637)

It never stops this error message.

Describe of connect-configs


Topic:connect-configs   PartitionCount:1        ReplicationFactor:1     Configs:cleanup.policy=compact
        Topic: connect-configs  Partition: 0    Leader: 2       Replicas: 2     Isr: 2
TheDataGuy
  • 2,712
  • 6
  • 37
  • 89

1 Answers1

15

MSK sets min.in.sync.replicas to 2 for all topics by default (see https://docs.aws.amazon.com/msk/latest/developerguide/msk-default-configuration.html)

It possible that Kafka Connect is producing using ACKs="all" and, since you only have one copy of your topic, it never achieves enough quorum.

Javier Holguera
  • 1,301
  • 2
  • 11
  • 27
  • 1
    Yes, by default it's using "ACKs=all" (https://docs.confluent.io/5.5.0/installation/configuration/producer-configs.html#acks) and "offsets.topic.replication.factor=3" (https://docs.confluent.io/5.5.0/installation/configuration/broker-configs.html#offsets.topic.replication.factor) So, by default kafka connector requires a minimum of 3 nodes (1 leader + 2 replicas). – Wédney Yuri Jun 15 '20 at 15:23
  • You can also change it in the Kafka-connect level with: offset.storage.replication.factor, config.storage.replication.factor, status.storage.replication.factor=1 and if running with Docker: CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR, CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR CONNECT_STATUS_STORAGE_REPLICATION_FACTOR set the value to be the save as your MSK "min.insync.replicas" – shlomiLan Feb 25 '21 at 15:44
  • 1
    I changed all the values mentioned by @shlomiLan to 2 (the value of `min.insync.replicas` by default in MSK), and it worked! – ritmatter Nov 24 '21 at 15:38