Operation Environment
- Three servers
- Three Kafka broker, connect, schema-registry (confluent-7.1.0)
- One ftp connector for test (3 tasks)
Problem
- Connect produce duplicated message. However, I hope that the ftp connector to issue one message per file.
Distributed connect produce same message three times (one message for each connect task)
- Below is log when connector task produce message.
- This log is printed each connect process log
[2022-06-26 15:23:12,839] INFO [ftp-test-conn|task-0] poll (com.datamountaineer.streamreactor.connect.ftp.source.FtpSourcePoller:77)
[2022-06-26 15:23:12,839] INFO [ftp-test-conn|task-0] connect 10.0.0.138:None (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:294)
[2022-06-26 15:23:12,862] INFO [ftp-test-conn|task-0] successfully connected to the ftp server and logged in (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:311)
[2022-06-26 15:23:12,863] INFO [ftp-test-conn|task-0] passive we are (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:318)
[2022-06-26 15:23:12,870] INFO [ftp-test-conn|task-0] Found 4 items in /home/smheo/ftp-dir/* (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:245)
[2022-06-26 15:23:12,877] INFO [ftp-test-conn|task-0] meta store storage HASN'T /home/smheo/ftp-dir/msg-4 (com.datamountaineer.streamreactor.connect.ftp.source.ConnectFileMetaDataStore:48)
[2022-06-26 15:23:12,878] INFO [ftp-test-conn|task-0] fetching /home/smheo/ftp-dir/msg-4 (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:102)
[2022-06-26 15:23:12,881] INFO [ftp-test-conn|task-0] fetched /home/smheo/ftp-dir/msg-4, wasn't known before (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:218)
[2022-06-26 15:23:12,881] INFO [ftp-test-conn|task-0] dump entire /home/smheo/ftp-dir/msg-4 (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:219)
[2022-06-26 15:23:12,881] INFO [ftp-test-conn|task-0] got some fileChanges: /home/smheo/ftp-dir/msg-4, offset = -1 (com.datamountaineer.streamreactor.connect.ftp.source.FtpSourcePoller:96)
Consumer consume same message
(base) ubuntu@ubuntu:~/distributed-pipeline/confluent-7.1.0$ ./bin/kafka-console-consumer --bootstrap-server <BROKER_IP>:9092 --topic default-topic-1
hello
hello
hello
FTP connector
{
"ftp-test-conn": {
"info": {
"name": "ftp-test-conn",
"config": {
"connector.class": "com.datamountaineer.streamreactor.connect.ftp.source.FtpSourceConnector",
"connect.ftp.address": "<FTP HOST IP>",
"connect.ftp.keystyle": "string",
"compression.type": "gzip",
"connect.ftp.user": "ftpusername",
"connect.ftp.refresh": "PT1M",
"tasks.max": "3",
"connect.ftp.file.maxage": "P7D",
"name": "ftp-test-conn",
"connect.ftp.monitor.update": "/home/username/ftp-dir/:default-topic-1",
"connect.ftp.timeout": "3000000",
"connect.ftp.password": "<PASSWORD>"
},
"tasks": [
{
"connector": "ftp-test-conn",
"task": 0
},
{
"connector": "ftp-test-conn",
"task": 1
},
{
"connector": "ftp-test-conn",
"task": 2
}
],
"type": "source"
},
"status": {
"name": "ftp-test-conn",
"connector": {
"state": "RUNNING",
"worker_id": "<BROKER 1 IP>:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "<BROKER 1 IP>:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "<BROKER 2 IP>:8083"
},
{
"id": 2,
"state": "RUNNING",
"worker_id": "<BROKER 3 IP>:8083"
}
],
"type": "source"
}
}
}