I am creating a CDC pipeline in Flink to replicate a Postgres database into a StarRocks database. The JobManager and the TaskManager are up and running. But no matter how I try to start the CDC job (see details below), I keep getting this error message:
Exception in thread "main" java.lang.RuntimeException: Cannot find factory with identifier "postgres-cdc" in the classpath.
Available factory classes are:
org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkFactory
...
So at runtime, it can find the StarRocks connector jar, but not the Postgres ones, which are in the same folder (/opt/flink/usrlib/flink-cdc-3.4.0/lib/
)
This is the first time I am using Flink and I have now tried for over a day to find the right configuration, but without success... So any tips would be helpful!
# JobManager
podman run \
--name=flink_jobmanager \
--detach \
--env FLINK_PROPERTIES="jobmanager.rpc.address: 192.168.1.10" \
--volume "/var/opt/flink/data:/opt/flink/usrlib" \
--network flink_net \
--publish 8081:8081 \
--publish 6123:6123 \
docker.io/flink:latest \
jobmanager
# TaskManager
podman run \
--name=flink_taskmanager \
--detach \
--env FLINK_PROPERTIES="jobmanager.rpc.address: 192.168.1.10" \
--volume "/var/opt/flink/data:/opt/flink/usrlib" \
--network flink_net \
docker.io/flink:latest \
taskmanager
# Start the job
podman run \
--rm \
--env FLINK_PROPERTIES="jobmanager.rpc.address: 192.168.1.10" \
--volume "/var/opt/flink/data:/opt/flink/usrlib:rw" \
--network flink_net \
docker.io/flink:latest \
/opt/flink/usrlib/flink-cdc-3.4.0/bin/flink-cdc.sh \
/opt/flink/usrlib/postgres-to-starrocks.yaml
(Postgres jars as suggested here: https://nightlies.apache.org/flink/flink-cdc-docs-release-3.2/docs/connectors/flink-sources/postgres-cdc/)
/var/opt/flink/data$ tree
.
├── flink-cdc-3.4.0
│ ├── bin
│ │ └── flink-cdc.sh
│ ├── conf
│ │ ├── flink-cdc.yaml
│ │ └── log4j-cli.properties
│ ├── lib
│ │ ├── flink-cdc-dist-3.4.0.jar
│ │ ├── flink-cdc-pipeline-connector-starrocks-3.4.0.jar
│ │ ├── flink-connector-postgres-cdc-3.4.0.jar
│ │ └── flink-sql-connector-postgres-cdc-3.4.0.jar
│ ├── LICENSE
│ ├── log
│ └── NOTICE
└── postgres-to-starrocks.yaml
# Job properties
job:
name: postgres-to-starrocks
parallelism: 4
checkpointing:
enabled: true
interval: 60000 # Checkpoint every 60 seconds (in milliseconds)
mode: EXACTLY_ONCE
timeout: 600000 # Checkpoint timeout 10 minutes
min-pause: 5000 # Minimum pause between checkpoints
source:
type: "postgres-cdc"
hostname: 192.168.1.10
port: 5432
username: username
password: "password"
database-name: database
schema-name: public
tables: database.\.*
sink:
type: starrocks
name: StarRocks Sink
jdbc-url: "jdbc:mysql://192.168.1.10:9030"
load-url: 192.168.1.10:8030
username: username
password: "password"
database-name: test
There isn't a Pipeline connector for Postgres.
The terminology in the project is confusing. For Postgres there is only a CDC connector, to use from the Table API or Flink SQL.
See https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/pipeline-connectors/overview/ for a list of pipeline connectors.