1

I am working on defining a Camel S3 Source connector with our Confluent (5.5.1) installation. After creating the connector and checking status as "RUNNING", I upload a file to my S3 bucket. Even if I do ls for the bucket, it is empty, which indicates the file is processed and deleted. But, I do not see messages in the topic. I am basically following this example trying a simple 4 line file, but instead of standalone kafka, doing it on a confluent cluster.

This is my configuration

{
    "name": "CamelAWSS3SourceConnector",
    "connector.class": "org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector",
    "bootstrap.servers": "broker1-dev:9092,broker2-dev:9092,broker3-dev:9092",
    "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\"  password=\"password\";",
    "security.protocol": "SASL_SSL",
    "ssl.truststore.location": "/config/client.truststore.jks",
    "ssl.truststore.password": "password",
    "ssl.keystore.location": "/config/client.keystore.jks",
    "ssl.keystore.password": "password",
    "ssl.key.password": "password",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "errors.tolerance": "all",
    "offset.flush.timeout.ms": "60000",
    "offset.flush.interval.ms": "10000",
    "max.request.size": "10485760",
    "flush.size": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.camel.kafkaconnector.awss3.converters.S3ObjectConverter",
    "camel.source.maxPollDuration": "10000",
    "topics": "TEST-CAMEL-S3-SOURCE-POC",
    "camel.source.path.bucketNameOrArn": "arn:aws:s3:::my-bucket",
    "camel.component.aws-s3.region": "US_EAST_1",
    "tasks.max": "1",
    "camel.source.endpoint.useIAMCredentials": "true",
    "camel.source.endpoint.autocloseBody": "true"
}

And I see these errors in the logs

[2020-12-23 09:05:01,876] ERROR WorkerSourceTask{id=CamelAWSS3SourceConnector-0} Failed to flush, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:448)
[2020-12-23 09:05:01,876] ERROR WorkerSourceTask{id=CamelAWSS3SourceConnector-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:116)


[2020-12-23 09:20:58,685] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Received successful Heartbeat response (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:1045)
[2020-12-23 09:20:58,688] DEBUG WorkerSourceTask{id=CamelAWSS3SourceConnector-0} Committing offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:111)
[2020-12-23 09:20:58,688] INFO WorkerSourceTask{id=CamelAWSS3SourceConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:426)
[2020-12-23 09:20:58,688] INFO WorkerSourceTask{id=CamelAWSS3SourceConnector-0} flushing 1 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:443)

And if I do a curl request for the status of the connector, I get this error for the status

trace: org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
        at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:111)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:438)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

I saw below solution in a couple of links, but that also didn't help. It suggested to add below keys to the config

"offset.flush.timeout.ms": "60000",
"offset.flush.interval.ms": "10000",
"max.request.size": "10485760",

Thank you

UPDATE

I cut the config to minimal, but still get the same error

{
    "connector.class": "org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.camel.kafkaconnector.awss3.converters.S3ObjectConverter",
    "camel.source.maxPollDuration": "10000",
    "topics": "TEST-S3-SOURCE-MINIMAL-POC",
    "camel.source.path.bucketNameOrArn": "pruvpcaws003-np-use1-push-json-poc",
    "camel.component.aws-s3.region": "US_EAST_1",
    "tasks.max": "1",
    "camel.source.endpoint.useIAMCredentials": "true",
    "camel.source.endpoint.autocloseBody": "true"
}

Still get the same error

trace: org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
    at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:111)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:438)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Not sure where else should I look too find the root cause

adbdkb
  • 1,897
  • 6
  • 37
  • 66
  • @RobinMoffatt - Any idea what could be happening here? I tried with minimal configuration also and still get the same error. The test file is a small file 4 small text lines. – adbdkb Dec 24 '20 at 03:12
  • 1
    After a lot of effort spent on figuring out what the issue with the configuration was, I compared the Topic definitions from my connector with another source connector that worked and the difference between the two was - Mine was "compact" whereas the other one was "delete". changed mine to 'delete' and I was successful. Can someone explain - why did the `compact`not work? – adbdkb Jan 02 '21 at 01:28

0 Answers0