apache-kafkaftpapache-kafka-connectexactly-once

Kafka distributed connect produce duplicated message


Operation Environment


Problem


Distributed connect produce same message three times (one message for each connect task)

[2022-06-26 15:23:12,839] INFO [ftp-test-conn|task-0] poll (com.datamountaineer.streamreactor.connect.ftp.source.FtpSourcePoller:77)
[2022-06-26 15:23:12,839] INFO [ftp-test-conn|task-0] connect 10.0.0.138:None (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:294)
[2022-06-26 15:23:12,862] INFO [ftp-test-conn|task-0] successfully connected to the ftp server and logged in (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:311)
[2022-06-26 15:23:12,863] INFO [ftp-test-conn|task-0] passive we are (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:318)
[2022-06-26 15:23:12,870] INFO [ftp-test-conn|task-0] Found 4 items in /home/smheo/ftp-dir/* (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:245)
[2022-06-26 15:23:12,877] INFO [ftp-test-conn|task-0] meta store storage HASN'T /home/smheo/ftp-dir/msg-4 (com.datamountaineer.streamreactor.connect.ftp.source.ConnectFileMetaDataStore:48)
[2022-06-26 15:23:12,878] INFO [ftp-test-conn|task-0] fetching /home/smheo/ftp-dir/msg-4 (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:102)
[2022-06-26 15:23:12,881] INFO [ftp-test-conn|task-0] fetched /home/smheo/ftp-dir/msg-4, wasn't known before (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:218)
[2022-06-26 15:23:12,881] INFO [ftp-test-conn|task-0] dump entire /home/smheo/ftp-dir/msg-4 (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:219)
[2022-06-26 15:23:12,881] INFO [ftp-test-conn|task-0] got some fileChanges: /home/smheo/ftp-dir/msg-4, offset = -1 (com.datamountaineer.streamreactor.connect.ftp.source.FtpSourcePoller:96)

Consumer consume same message

(base) ubuntu@ubuntu:~/distributed-pipeline/confluent-7.1.0$ ./bin/kafka-console-consumer --bootstrap-server <BROKER_IP>:9092 --topic default-topic-1

hello

hello

hello

FTP connector

{
    "ftp-test-conn": {
        "info": {
            "name": "ftp-test-conn",
            "config": {
                "connector.class": "com.datamountaineer.streamreactor.connect.ftp.source.FtpSourceConnector",
                "connect.ftp.address": "<FTP HOST IP>",
                "connect.ftp.keystyle": "string",
                "compression.type": "gzip",
                "connect.ftp.user": "ftpusername",
                "connect.ftp.refresh": "PT1M",
                "tasks.max": "3",
                "connect.ftp.file.maxage": "P7D",
                "name": "ftp-test-conn",
                "connect.ftp.monitor.update": "/home/username/ftp-dir/:default-topic-1",
                "connect.ftp.timeout": "3000000",
                "connect.ftp.password": "<PASSWORD>"
            },
            "tasks": [
                {
                    "connector": "ftp-test-conn",
                    "task": 0
                },
                {
                    "connector": "ftp-test-conn",
                    "task": 1
                },
                {
                    "connector": "ftp-test-conn",
                    "task": 2
                }
            ],
            "type": "source"
        },
        "status": {
            "name": "ftp-test-conn",
            "connector": {
                "state": "RUNNING",
                "worker_id": "<BROKER 1 IP>:8083"
            },
            "tasks": [
                {
                    "id": 0,
                    "state": "RUNNING",
                    "worker_id": "<BROKER 1 IP>:8083"
                },
                {
                    "id": 1,
                    "state": "RUNNING",
                    "worker_id": "<BROKER 2 IP>:8083"
                },
                {
                    "id": 2,
                    "state": "RUNNING",
                    "worker_id": "<BROKER 3 IP>:8083"
                }
            ],
            "type": "source"
        }
    }
}

Solution

  • Each task is most-likely reading the same file. Try only setting tasks.max=1. More specifically, there is no filesystem locking between FTP clients (each task starts its own connection), so you would only be limited to one reader task.

    Look closer at the logs, you can see task ID in [ftp-test-conn|task-0]

    Also, it's not recommended to run Connect on the same hosts as the brokers.