10

Whenever I restart the debezium kafka-connect container, or deploy another instance, I get the following error:

    io.debezium.jdbc.JdbcConnectionException: ERROR: replication slot "debezium" already exists
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initReplicationSlot(PostgresReplicationConnection.java:136)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.<init>(PostgresReplicationConnection.java:79)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.<init>(PostgresReplicationConnection.java:38)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$ReplicationConnectionBuilder.build(PostgresReplicationConnection.java:349)
    at io.debezium.connector.postgresql.PostgresTaskContext.createReplicationConnection(PostgresTaskContext.java:80)
    at io.debezium.connector.postgresql.RecordsStreamProducer.<init>(RecordsStreamProducer.java:75)
    at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:112)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:157)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.postgresql.util.PSQLException: ERROR: replication slot "debezium" already exists
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2412)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2125)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:297)
    at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:428)
    at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:354)
    at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:301)
    at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:287)
    at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:264)
    at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:260)
    at org.postgresql.replication.fluent.logical.LogicalCreateSlotBuilder.make(LogicalCreateSlotBuilder.java:48)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initReplicationSlot(PostgresReplicationConnection.java:102)
    ... 14 more

I'm using this image: https://github.com/debezium/docker-images/tree/master/connect/0.8

And have config for it like this:

    {  
   "name":"record-loader-connector",
   "config":{  
      "connector.class":"io.debezium.connector.postgresql.PostgresConnector",
      "database.dbname":"record_loader?ssl",
      "database.user":"postgres",
      "database.hostname":"redacted",
      "database.history.kafka.bootstrap.servers":"redacted",
      "database.history.kafka.topic":"dbhistory.recordloader",
      "database.password":"redacted",
      "name":"record-loader-connector",
      "database.server.name":"recordLoaderDb",
      "database.port":"20023",
      "table.whitelist":".*sync"
   },
   "tasks":[  
      {  
         "connector":"record-loader-connector",
         "task":0
      }
   ],
   "type":"source"
}

I've noticed these two config options (slot.name and slot.drop_on_stop), but it is not clear to me if/how I should change them:

http://debezium.io/docs/connectors/postgresql/#connector-properties

Zach Mays
  • 101
  • 1
  • 1
  • 6

1 Answers1

18

If you deploy multiple instances of the Debezium Postgres connector, you must make sure to use distinct replication slot names. You can specify a name when setting up the connector:

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname" : "postgres",
        "database.server.name": "dbserver1",
        "database.whitelist": "inventory",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory",
        "slot.name" : "my-slot-name"
    }
}

I can't reproduce the issue you describe when restarting a given connector instance. It should detect that the slot already exists and re-use that one (one possible cause may be that you altered the logical decoding plug-in, too ("decoderbufs" vs. "wal2json")?. If you have a reproducer for this, could you please open an entry in our bug tracker?

To proceed, you can manually delete the slot in Postgres:

select pg_drop_replication_slot('debezium');
Gunnar
  • 18,095
  • 1
  • 53
  • 73
  • Thanks for the reply. So when I deploy a second instance of kafka-connect, it picks up the existing config that I posted, so I have no chance to specify a different value for slot name. The images run in distributed mode: https://github.com/debezium/docker-images/blob/master/connect-base/0.8/docker-entrypoint.sh#L169 and from what I can tell I assumed that meant that additional instances would coordinate. – Zach Mays Jun 14 '18 at 18:18
  • WDYM by "pick up"? You should make sure to specify a different name, it will take the config of that JSON you're POSTing to Kafka Connect. – Gunnar Jun 14 '18 at 18:20
  • Also, when you say "restarting a given connector instance" do you mean from the rest api, or actually restarting the docker container itself? For me it is the latter that is causing the replication slot issues. – Zach Mays Jun 14 '18 at 18:22
  • The former, but it also works for me when restarting the container. If you could provide exact steps for reproducing the issue, that'd be great. – Gunnar Jun 14 '18 at 18:24
  • re "pick up" - I docker run one kafka-connect instance, then POST the connector config. I then docker run a second kafka-connect instance, it immediately runs into the slot issue without me posting any config. – Zach Mays Jun 14 '18 at 18:26
  • I'll create some reproducible steps, thanks for the help – Zach Mays Jun 14 '18 at 18:27
  • Ah, that's interesting. Are you running Connect in Cluster mode then? Wondering whether the issue happens during a rebalance. So yes, a reproducer would be great. Thanks! – Gunnar Jun 14 '18 at 18:31
  • Alas, still trying to reproduce locally with a docker-compose. I went ahead and turned on debug mode on the deployed instance that is getting the issue, and found this. Will keep digging, but figured I'd go ahead and post this: – Zach Mays Jun 19 '18 at 18:46
  • DEBUG No replication slot 'debezium' is present for plugin 'decoderbufs' and database 'record_loader?ssl' [io.debezium.connector.postgresql.connection.PostgresConnection] DEBUG Creating new replication slot 'debezium' for plugin 'DECODERBUFS' [io.debezium.connector.postgresql.connection.PostgresReplicationConnection] ERROR || WorkerSourceTask{id=record-loader-connector-0} Task threw an uncaught and unrecoverable exception [org.apache.kafka.connect.runtime.WorkerTask] io.debezium.jdbc.JdbcConnectionException: ERROR: replication slot "debezium" already exists – Zach Mays Jun 19 '18 at 18:49
  • And here is what is in the db: -[ RECORD 1 ]-------+-------------- slot_name | debezium plugin | decoderbufs slot_type | logical datoid | 16384 database | record_loader temporary | f active | f active_pid | xmin | catalog_xmin | 2223 restart_lsn | D/9F393E98 confirmed_flush_lsn | D/9F3947F4 – Zach Mays Jun 19 '18 at 18:49
  • Ah, I logged the db was getting: 2018-06-19 18:57:34 UTC [150-654] postgres@record_loader LOG: execute : select * from pg_replication_slots where slot_name = $1 and database = $2 and plugin = $3 2018-06-19 18:57:34 UTC [150-655] postgres@record_loader DETAIL: parameters: $1 = 'debezium', $2 = 'record_loader?ssl', $3 = 'decoderbufs' – Zach Mays Jun 19 '18 at 18:59
  • It is my hack to allow for ssl that is causing the issue – Zach Mays Jun 19 '18 at 18:59
  • Not sure how to properly do ssl, I dropped ?ssl from the db name, and set database.sslmode to required, but now I get this error: {"error_code":500,"message":"Could not intialize type registry"} – Zach Mays Jun 19 '18 at 19:25
  • Caused by: org.postgresql.util.PSQLException: Invalid sslmode value: required – Zach Mays Jun 19 '18 at 21:00
  • Saw this: https://issues.jboss.org/browse/DBZ-238 So is require the right value to pass, and the docs just need to be updated to say that? – Zach Mays Jun 19 '18 at 21:05
  • Yes, it's "require". We already updated the dev branch docs, will be pushed to the live website with the next release. – Gunnar Jun 20 '18 at 07:51