I am working on defining a Camel S3 Source connector with our Confluent (5.5.1) installation. After creating the connector and checking status as "RUNNING", I upload a file to my S3 bucket. Even if I do ls for the bucket, it is empty, which indicates the file is processed and deleted. But, I do not see messages in the topic. I am basically following this example trying a simple 4 line file, but instead of standalone kafka, doing it on a confluent cluster.
This is my configuration
{
"name": "CamelAWSS3SourceConnector",
"connector.class": "org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector",
"bootstrap.servers": "broker1-dev:9092,broker2-dev:9092,broker3-dev:9092",
"sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";",
"security.protocol": "SASL_SSL",
"ssl.truststore.location": "/config/client.truststore.jks",
"ssl.truststore.password": "password",
"ssl.keystore.location": "/config/client.keystore.jks",
"ssl.keystore.password": "password",
"ssl.key.password": "password",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.tolerance": "all",
"offset.flush.timeout.ms": "60000",
"offset.flush.interval.ms": "10000",
"max.request.size": "10485760",
"flush.size": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.camel.kafkaconnector.awss3.converters.S3ObjectConverter",
"camel.source.maxPollDuration": "10000",
"topics": "TEST-CAMEL-S3-SOURCE-POC",
"camel.source.path.bucketNameOrArn": "arn:aws:s3:::my-bucket",
"camel.component.aws-s3.region": "US_EAST_1",
"tasks.max": "1",
"camel.source.endpoint.useIAMCredentials": "true",
"camel.source.endpoint.autocloseBody": "true"
}
And I see these errors in the logs
[2020-12-23 09:05:01,876] ERROR WorkerSourceTask{id=CamelAWSS3SourceConnector-0} Failed to flush, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:448)
[2020-12-23 09:05:01,876] ERROR WorkerSourceTask{id=CamelAWSS3SourceConnector-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:116)
[2020-12-23 09:20:58,685] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Received successful Heartbeat response (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:1045)
[2020-12-23 09:20:58,688] DEBUG WorkerSourceTask{id=CamelAWSS3SourceConnector-0} Committing offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:111)
[2020-12-23 09:20:58,688] INFO WorkerSourceTask{id=CamelAWSS3SourceConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:426)
[2020-12-23 09:20:58,688] INFO WorkerSourceTask{id=CamelAWSS3SourceConnector-0} flushing 1 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:443)
And if I do a curl request for the status of the connector, I get this error for the status
trace: org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:111)
at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:438)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
I saw below solution in a couple of links, but that also didn't help. It suggested to add below keys to the config
"offset.flush.timeout.ms": "60000",
"offset.flush.interval.ms": "10000",
"max.request.size": "10485760",
Thank you
UPDATE
I cut the config to minimal, but still get the same error
{
"connector.class": "org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.camel.kafkaconnector.awss3.converters.S3ObjectConverter",
"camel.source.maxPollDuration": "10000",
"topics": "TEST-S3-SOURCE-MINIMAL-POC",
"camel.source.path.bucketNameOrArn": "pruvpcaws003-np-use1-push-json-poc",
"camel.component.aws-s3.region": "US_EAST_1",
"tasks.max": "1",
"camel.source.endpoint.useIAMCredentials": "true",
"camel.source.endpoint.autocloseBody": "true"
}
Still get the same error
trace: org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:111)
at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:438)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Not sure where else should I look too find the root cause