[2022-06-26 15:23:12,839] INFO [ftp-test-conn|task-0] poll (com.datamountaineer.streamreactor.connect.ftp.source.FtpSourcePoller:77)
[2022-06-26 15:23:12,839] INFO [ftp-test-conn|task-0] connect 10.0.0.138:None (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:294)
[2022-06-26 15:23:12,862] INFO [ftp-test-conn|task-0] successfully connected to the ftp server and logged in (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:311)
[2022-06-26 15:23:12,863] INFO [ftp-test-conn|task-0] passive we are (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:318)
[2022-06-26 15:23:12,870] INFO [ftp-test-conn|task-0] Found 4 items in /home/smheo/ftp-dir/* (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:245)
[2022-06-26 15:23:12,877] INFO [ftp-test-conn|task-0] meta store storage HASN'T /home/smheo/ftp-dir/msg-4 (com.datamountaineer.streamreactor.connect.ftp.source.ConnectFileMetaDataStore:48)
[2022-06-26 15:23:12,878] INFO [ftp-test-conn|task-0] fetching /home/smheo/ftp-dir/msg-4 (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:102)
[2022-06-26 15:23:12,881] INFO [ftp-test-conn|task-0] fetched /home/smheo/ftp-dir/msg-4, wasn't known before (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:218)
[2022-06-26 15:23:12,881] INFO [ftp-test-conn|task-0] dump entire /home/smheo/ftp-dir/msg-4 (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:219)
[2022-06-26 15:23:12,881] INFO [ftp-test-conn|task-0] got some fileChanges: /home/smheo/ftp-dir/msg-4, offset = -1 (com.datamountaineer.streamreactor.connect.ftp.source.FtpSourcePoller:96)
(base) ubuntu@ubuntu:~/distributed-pipeline/confluent-7.1.0$ ./bin/kafka-console-consumer --bootstrap-server <BROKER_IP>:9092 --topic default-topic-1
hello
hello
hello
{
"ftp-test-conn": {
"info": {
"name": "ftp-test-conn",
"config": {
"connector.class": "com.datamountaineer.streamreactor.connect.ftp.source.FtpSourceConnector",
"connect.ftp.address": "<FTP HOST IP>",
"connect.ftp.keystyle": "string",
"compression.type": "gzip",
"connect.ftp.user": "ftpusername",
"connect.ftp.refresh": "PT1M",
"tasks.max": "3",
"connect.ftp.file.maxage": "P7D",
"name": "ftp-test-conn",
"connect.ftp.monitor.update": "/home/username/ftp-dir/:default-topic-1",
"connect.ftp.timeout": "3000000",
"connect.ftp.password": "<PASSWORD>"
},
"tasks": [
{
"connector": "ftp-test-conn",
"task": 0
},
{
"connector": "ftp-test-conn",
"task": 1
},
{
"connector": "ftp-test-conn",
"task": 2
}
],
"type": "source"
},
"status": {
"name": "ftp-test-conn",
"connector": {
"state": "RUNNING",
"worker_id": "<BROKER 1 IP>:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "<BROKER 1 IP>:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "<BROKER 2 IP>:8083"
},
{
"id": 2,
"state": "RUNNING",
"worker_id": "<BROKER 3 IP>:8083"
}
],
"type": "source"
}
}
}
Each task is most-likely reading the same file. Try only setting tasks.max=1
. More specifically, there is no filesystem locking between FTP clients (each task starts its own connection), so you would only be limited to one reader task.
Look closer at the logs, you can see task ID in [ftp-test-conn|task-0]
Also, it's not recommended to run Connect on the same hosts as the brokers.