postgresqlapache-kafkadebeziumdebezium-connect

Debezium fails to reconenct after lost connection to DB


I'm trying to achieve production-readiness of debezium CDC. One problem I came across is, that after a conenction loss (e.g. restart of source DB), it fails to reconnect:

WorkerSourceTask{id=inventory-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted
io.debezium.DebeziumException: Couldn't obtain encoding for database
  1. Could anyone recommend an out-of-the box solution for this?
  2. An idea was to create a docker image on top of official debezium, that would add a script periodically calling GET localhost:8083/connectors/XX/status -> body.tasks[0].state, and trigger POST localhost:8083/connectors/XX/restart, but calling /restart seems not to be working.

So far, I found a very old thread, which was not very helpful. Has anything changed since? Debezium source task fails to reconnect to postgresql DB when DB container is re-created

Thank you for any idea.

Environment:

        "name": "xx-connector",
        "config": {
            "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
            "database.hostname": "192.168.37.51",
            "plugin.name": "pgoutput",
            "database.port": "5432",
            "database.user": "debezium",
            "database.password": "debezium",
            "database.dbname": "xx",
            "schema.include.list": "xx",
            "table.include.list": "xx.x",
            "database.server.name": "xx-db3",
            "topic.prefix": "debezium.testdb",
            "snapshot.mode": "never",
            "topic.creation.enable": "true",
            "topic.creation.default.replication.factor": "1",
            "topic.creation.default.partitions": "1",
            "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false"
        }
    }
    "name": "inventory-connector",
    "connector": {
        "state": "RUNNING",
        "worker_id": "172.17.0.5:8083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "FAILED",
            "worker_id": "172.17.0.5:8083",
            "trace": "io.debezium.DebeziumException: Couldn't obtain encoding for database xx\n\tat io.debezium.connector.postgresql.connection.PostgresConnection.getDatabaseCharset(PostgresConnection.java:577)\n\tat io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:86)\n\tat io.debezium.connector.common.BaseSourceTask.startIfNeededAndPossible(BaseSourceTask.java:251)\n\tat io.debezium.connector.common.BaseSourceTask.poll(BaseSourceTask.java:178)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:469)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:357)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.postgresql.util.PSQLException: Connection to 192.168.37.51:5432 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.\n\tat org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:342)\n\tat org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:54)\n\tat org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:263)\n\tat org.postgresql.Driver.makeConnection(Driver.java:443)\n\tat org.postgresql.Driver.connect(Driver.java:297)\n\tat io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:243)\n\tat io.debezium.jdbc.JdbcConnection$ConnectionFactoryDecorator.connect(JdbcConnection.java:129)\n\tat io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:875)\n\tat io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:870)\n\tat io.debezium.connector.postgresql.connection.PostgresConnection.getDatabaseCharset(PostgresConnection.java:574)\n\t... 14 more\nCaused by: java.net.ConnectException: Connection refused (Connection refused)\n\tat java.base/java.net.PlainSocketImpl.socketConnect(Native Method)\n\tat java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)\n\tat java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)\n\tat java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)\n\tat java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)\n\tat java.base/java.net.Socket.connect(Socket.java:609)\n\tat org.postgresql.core.PGStream.createSocket(PGStream.java:243)\n\tat org.postgresql.core.PGStream.<init>(PGStream.java:98)\n\tat org.postgresql.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:132)\n\tat org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:258)\n\t... 23 more\n"
        }
    ],
    "type": "source"
}

Solution

  • Calling /restart does not work, but combination of /stop and /resume does.

    Because I find this a bit tricky, question 1 is still relevant.

    Here is the script:

    #!/bin/ash
    
    # Function to log messages
    log_message() {
        echo "$(date '+%Y-%m-%d %H:%M:%S') $1"
    }
    
    # Log the start of the script
    log_message "Checking liveliness of connectors..."
    
    # Call GET host:8083/connectors/ to get the list of connectors
    connectors=$(curl -s http://${DEBEZIUM_CONTAINER_HOST}:8083/connectors/)
    
    # Loop through each connector
    for connector in $(echo "${connectors}" | jq -r '.[]'); do
        # Call GET host:8083/connectors/eachConnector/status
        status=$(curl -s http://${DEBEZIUM_CONTAINER_HOST}:8083/connectors/${connector}/status)
    
        # Check if status is FAILED
        if [[ $(echo "${status}" | jq -r '.tasks[0].state') == "FAILED" ]]; then
            # Log that connector is restarting
            log_message "Restarting connector ${connector}..."
    
            # Call PUT host:8083/connectors/eachConnector/stop
            curl -X PUT -s http://${DEBEZIUM_CONTAINER_HOST}:8083/connectors/${connector}/stop > /dev/null
    
            # Call PUT host:8083/connectors/eachConnector/resume
            curl -X PUT -s http://${DEBEZIUM_CONTAINER_HOST}:8083/connectors/${connector}/resume > /dev/null
        else
            # Log that connector is OK
            log_message "Connector ${connector} is OK."
        fi
    done
    
    # Log the end of the script
    log_message "Connector liveliness check completed."
    
    FROM alpine:3.19.1
    
    COPY scripts /scripts
    RUN chmod +x /scripts/healthcheck.sh
    
    RUN apk add curl
    RUN apk add jq
    RUN crontab -l | { cat; echo "* * * * *    /scripts/healthcheck.sh"; } | crontab -
    
    CMD ["crond", "-f", "-d", "8"]