apache-kafkaapache-kafka-connectkafka-cluster

Two (Kafka) S3 Connectors not working simultaneously


I have a Kafka Connect working in a cluster (3 nodes) with 1 connector (topic -> S3), everything is fine:

root@dev-kafka1 ~]# curl localhost:8083/connectors/s3-postgres/status | jq -r
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   219  100   219    0     0  36384      0 --:--:-- --:--:-- --:--:-- 43800
{
  "name": "s3-postgres",
  "connector": {
    "state": "RUNNING",
    "worker_id": "127.0.0.1:8083"
  },
  "tasks": [
    {
      "state": "RUNNING",
      "id": 0,
      "worker_id": "127.0.0.1:8083"
    },
    {
      "state": "RUNNING",
      "id": 1,
      "worker_id": "127.0.0.1:8083"
    }
  ],
  "type": "sink"
}

But when I created another connector, the task status is always like that:

[root@dev-kafka1 ~]# curl localhost:8083/connectors/s3-postgres6/status | jq -r
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   109  100   109    0     0  14347      0 --:--:-- --:--:-- --:--:-- 15571
{
  "name": "s3-postgres6",
  "connector": {
    "state": "RUNNING",
    "worker_id": "127.0.0.1:8083"
  },
  "tasks": [],
  "type": "sink"
}

I don't know why I did it wrong in configuration that two connectors of the same plugin don't work together, if I stop the connector #1 that is running fine, the connector #2 after restart, work fine. Does anyone know something I should change in configs ?


Solution

  • Hard to say what exactly the problem could be without searching through the logs, maybe even changing the logging to debug verbosity temporarily, but depending on the connector properties, Kafka Connect can be very memory hungry.

    Therefore, I'd suggest running Connect itself on isolated machines from the Kafka brokers, and allowing Connect to take more heap size (the default is 2g in latest versions) by exporting the KAFKA_HEAP_OPTS variable