I'm trying to achieve production-readiness of debezium CDC. One problem I came across is, that after a conenction loss (e.g. restart of source DB), it fails to reconnect:
WorkerSourceTask{id=inventory-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted
io.debezium.DebeziumException: Couldn't obtain encoding for database
GET localhost:8083/connectors/XX/status -> body.tasks[0].state
, and trigger POST localhost:8083/connectors/XX/restart
, but calling /restart
seems not to be working.So far, I found a very old thread, which was not very helpful. Has anything changed since? Debezium source task fails to reconnect to postgresql DB when DB container is re-created
Thank you for any idea.
Environment:
"name": "xx-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "192.168.37.51",
"plugin.name": "pgoutput",
"database.port": "5432",
"database.user": "debezium",
"database.password": "debezium",
"database.dbname": "xx",
"schema.include.list": "xx",
"table.include.list": "xx.x",
"database.server.name": "xx-db3",
"topic.prefix": "debezium.testdb",
"snapshot.mode": "never",
"topic.creation.enable": "true",
"topic.creation.default.replication.factor": "1",
"topic.creation.default.partitions": "1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}
"name": "inventory-connector",
"connector": {
"state": "RUNNING",
"worker_id": "172.17.0.5:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "172.17.0.5:8083",
"trace": "io.debezium.DebeziumException: Couldn't obtain encoding for database xx\n\tat io.debezium.connector.postgresql.connection.PostgresConnection.getDatabaseCharset(PostgresConnection.java:577)\n\tat io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:86)\n\tat io.debezium.connector.common.BaseSourceTask.startIfNeededAndPossible(BaseSourceTask.java:251)\n\tat io.debezium.connector.common.BaseSourceTask.poll(BaseSourceTask.java:178)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:469)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:357)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.postgresql.util.PSQLException: Connection to 192.168.37.51:5432 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.\n\tat org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:342)\n\tat org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:54)\n\tat org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:263)\n\tat org.postgresql.Driver.makeConnection(Driver.java:443)\n\tat org.postgresql.Driver.connect(Driver.java:297)\n\tat io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:243)\n\tat io.debezium.jdbc.JdbcConnection$ConnectionFactoryDecorator.connect(JdbcConnection.java:129)\n\tat io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:875)\n\tat io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:870)\n\tat io.debezium.connector.postgresql.connection.PostgresConnection.getDatabaseCharset(PostgresConnection.java:574)\n\t... 14 more\nCaused by: java.net.ConnectException: Connection refused (Connection refused)\n\tat java.base/java.net.PlainSocketImpl.socketConnect(Native Method)\n\tat java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)\n\tat java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)\n\tat java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)\n\tat java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)\n\tat java.base/java.net.Socket.connect(Socket.java:609)\n\tat org.postgresql.core.PGStream.createSocket(PGStream.java:243)\n\tat org.postgresql.core.PGStream.<init>(PGStream.java:98)\n\tat org.postgresql.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:132)\n\tat org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:258)\n\t... 23 more\n"
}
],
"type": "source"
}
Calling /restart
does not work, but combination of /stop
and /resume
does.
Because I find this a bit tricky, question 1 is still relevant.
Here is the script:
#!/bin/ash
# Function to log messages
log_message() {
echo "$(date '+%Y-%m-%d %H:%M:%S') $1"
}
# Log the start of the script
log_message "Checking liveliness of connectors..."
# Call GET host:8083/connectors/ to get the list of connectors
connectors=$(curl -s http://${DEBEZIUM_CONTAINER_HOST}:8083/connectors/)
# Loop through each connector
for connector in $(echo "${connectors}" | jq -r '.[]'); do
# Call GET host:8083/connectors/eachConnector/status
status=$(curl -s http://${DEBEZIUM_CONTAINER_HOST}:8083/connectors/${connector}/status)
# Check if status is FAILED
if [[ $(echo "${status}" | jq -r '.tasks[0].state') == "FAILED" ]]; then
# Log that connector is restarting
log_message "Restarting connector ${connector}..."
# Call PUT host:8083/connectors/eachConnector/stop
curl -X PUT -s http://${DEBEZIUM_CONTAINER_HOST}:8083/connectors/${connector}/stop > /dev/null
# Call PUT host:8083/connectors/eachConnector/resume
curl -X PUT -s http://${DEBEZIUM_CONTAINER_HOST}:8083/connectors/${connector}/resume > /dev/null
else
# Log that connector is OK
log_message "Connector ${connector} is OK."
fi
done
# Log the end of the script
log_message "Connector liveliness check completed."
FROM alpine:3.19.1
COPY scripts /scripts
RUN chmod +x /scripts/healthcheck.sh
RUN apk add curl
RUN apk add jq
RUN crontab -l | { cat; echo "* * * * * /scripts/healthcheck.sh"; } | crontab -
CMD ["crond", "-f", "-d", "8"]