apache-flink

Apache Flink: Postgres JAR not found


The issue

I am creating a CDC pipeline in Flink to replicate a Postgres database into a StarRocks database. The JobManager and the TaskManager are up and running. But no matter how I try to start the CDC job (see details below), I keep getting this error message:

Exception in thread "main" java.lang.RuntimeException: Cannot find factory with identifier "postgres-cdc" in the classpath.

Available factory classes are:

org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkFactory
...

So at runtime, it can find the StarRocks connector jar, but not the Postgres ones, which are in the same folder (/opt/flink/usrlib/flink-cdc-3.4.0/lib/)

This is the first time I am using Flink and I have now tried for over a day to find the right configuration, but without success... So any tips would be helpful!

Container setup

# JobManager
podman run \
  --name=flink_jobmanager \
  --detach \
  --env FLINK_PROPERTIES="jobmanager.rpc.address: 192.168.1.10" \
  --volume "/var/opt/flink/data:/opt/flink/usrlib" \
  --network flink_net \
  --publish 8081:8081 \
  --publish 6123:6123 \
  docker.io/flink:latest \
  jobmanager

# TaskManager
podman run \
  --name=flink_taskmanager \
  --detach \
  --env FLINK_PROPERTIES="jobmanager.rpc.address: 192.168.1.10" \
  --volume "/var/opt/flink/data:/opt/flink/usrlib" \
  --network flink_net \
  docker.io/flink:latest \
  taskmanager

# Start the job
podman run \
  --rm \
  --env FLINK_PROPERTIES="jobmanager.rpc.address: 192.168.1.10" \
  --volume "/var/opt/flink/data:/opt/flink/usrlib:rw" \
  --network flink_net \
  docker.io/flink:latest \
  /opt/flink/usrlib/flink-cdc-3.4.0/bin/flink-cdc.sh \
  /opt/flink/usrlib/postgres-to-starrocks.yaml

Folder structure/ .jar files flink-cdc-3.4.0/lib/

(Postgres jars as suggested here: https://nightlies.apache.org/flink/flink-cdc-docs-release-3.2/docs/connectors/flink-sources/postgres-cdc/)

/var/opt/flink/data$ tree
.
├── flink-cdc-3.4.0
│   ├── bin
│   │   └── flink-cdc.sh
│   ├── conf
│   │   ├── flink-cdc.yaml
│   │   └── log4j-cli.properties
│   ├── lib
│   │   ├── flink-cdc-dist-3.4.0.jar
│   │   ├── flink-cdc-pipeline-connector-starrocks-3.4.0.jar
│   │   ├── flink-connector-postgres-cdc-3.4.0.jar
│   │   └── flink-sql-connector-postgres-cdc-3.4.0.jar
│   ├── LICENSE
│   ├── log
│   └── NOTICE
└── postgres-to-starrocks.yaml

postgres-to-starrocks.yaml

# Job properties
job:
  name:        postgres-to-starrocks
  parallelism: 4
  checkpointing:
    enabled:   true
    interval:  60000 # Checkpoint every 60 seconds (in milliseconds)
    mode:      EXACTLY_ONCE
    timeout:   600000 # Checkpoint timeout 10 minutes
    min-pause: 5000 # Minimum pause between checkpoints

source:
  type:          "postgres-cdc"
  hostname:      192.168.1.10
  port:          5432
  username:      username
  password:      "password"
  database-name: database
  schema-name:   public
  tables:        database.\.*

sink:
  type:          starrocks
  name:          StarRocks Sink
  jdbc-url:      "jdbc:mysql://192.168.1.10:9030"
  load-url:      192.168.1.10:8030
  username:      username
  password:      "password"
  database-name: test

Solution

  • There isn't a Pipeline connector for Postgres.

    The terminology in the project is confusing. For Postgres there is only a CDC connector, to use from the Table API or Flink SQL.

    See https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/pipeline-connectors/overview/ for a list of pipeline connectors.