I'm having a behavior I don't exactly know how to modify. I'm testing the s3 Kafka sync connector and I have very little data in my topics.
At the moment I can see that there is data in the topic by using the Kafka manager but my connector reads the data and never moves the off-set and also never pushes it to Kafka. In other topics this works but in this specific topic it does not. I think has something to do with the timeouts but I cannot find the correct configuration property to set so that the flush is a bit faster.
This are my configs:
curl -X PUT -s -o /dev/null -H ""Content-Type:application/json""
http://localhost:$$CONNECT_REST_PORT/connectors/s3_connector_doc_cmg/config
\
-d '{
""connector.class"": ""io.confluent.connect.s3.S3SinkConnector"",
""storage.class"": ""io.confluent.connect.s3.storage.S3Storage"",
""s3.region"": ""us-east-1"",
""s3.bucket.name"": ""confluent-pipeline"",
""topics.dir"": ""topics"",
""topics"": ""com.acp.bde.doc_cmg"",
""flush.size"": ""25"",
""rotate.interval.ms"": ""5000"",
""auto.register.schemas"": ""false"",
""tasks.max"": ""1"",
""s3.part.size"": ""5242880"",
""timezone"": ""UTC"",
""parquet.codec"": ""snappy"",
""offset.flush.interval.ms"": ""5000"",
""offset.flush.timeout.ms"": ""1000"",
""s3.credentials.provider.class"": ""com.amazonaws.auth.DefaultAWSCredentialsProviderChain"",
""format.class"": ""io.confluent.connect.s3.format.avro.AvroFormat"",
""value.converter"": ""com.insight.connect.protobuf.ProtobufConverter"",
""key.converter"": ""org.apache.kafka.connect.storage.StringConverter"",
""partitioner.class"": ""io.confluent.connect.storage.partitioner.DailyPartitioner"",
""locale"": ""de-CH"",
""timezone"": ""Europe/Zurich"",
""store.url"": ""http://minio-server-svc:9000/""
}'"
And this is what I see in the log:
[2020-10-23 10:35:47,594] INFO Opening record writer for: topics/com.acp.bde.doc_cmg/year=2020/month=10/day=23/com.acp.bde.doc_cmg+1+0000000000.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
[2020-10-23 10:35:48,017] INFO Opening record writer for: topics/com.acp.bde.doc_cmg/year=2020/month=10/day=23/com.acp.bde.doc_cmg+3+0000000000.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
[2020-10-23 10:35:48,075] INFO Opening record writer for: topics/com.acp.bde.doc_cmg/year=2020/month=10/day=23/com.acp.bde.doc_cmg+2+0000000000.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
[2020-10-23 11:35:37,989] INFO [Worker clientId=connect-1, groupId=kafka-connect-01] Session key updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
So they are open for almost 1h already and nothing really happens, I'm wondering if my configuration is totally bad or there is some property and configuration that I would need so this pushing of data is a bit faster.
UPDATE: I still don't have this properly fixed but it seems actually it's a lack of memory issue.
The program just gets stuck in this line this.buffer = ByteBuffer.allocate(this.partSize);
The part that bothers me is that it does not complain at all just stays there stuck. Should it not crash with a out of memory problem? Or should the memory not get freed a bit faster? Pretty much it can stay in that call for over 3 or 4 h without any feedback.
I still think probably something is off in my configuration but I have no idea what or where I should be looking at.