0

I'm having a behavior I don't exactly know how to modify. I'm testing the s3 Kafka sync connector and I have very little data in my topics.

At the moment I can see that there is data in the topic by using the Kafka manager but my connector reads the data and never moves the off-set and also never pushes it to Kafka. In other topics this works but in this specific topic it does not. I think has something to do with the timeouts but I cannot find the correct configuration property to set so that the flush is a bit faster.

This are my configs:

      curl -X PUT -s -o /dev/null -H ""Content-Type:application/json""
      http://localhost:$$CONNECT_REST_PORT/connectors/s3_connector_doc_cmg/config
      \
        -d '{
          ""connector.class"": ""io.confluent.connect.s3.S3SinkConnector"",
          ""storage.class"": ""io.confluent.connect.s3.storage.S3Storage"",
          ""s3.region"": ""us-east-1"",
          ""s3.bucket.name"": ""confluent-pipeline"",
          ""topics.dir"": ""topics"",
          ""topics"": ""com.acp.bde.doc_cmg"",
          ""flush.size"": ""25"",
          ""rotate.interval.ms"": ""5000"",
          ""auto.register.schemas"": ""false"",
          ""tasks.max"": ""1"",
          ""s3.part.size"": ""5242880"",
          ""timezone"": ""UTC"",
          ""parquet.codec"": ""snappy"",
          ""offset.flush.interval.ms"": ""5000"",
          ""offset.flush.timeout.ms"": ""1000"",
          ""s3.credentials.provider.class"": ""com.amazonaws.auth.DefaultAWSCredentialsProviderChain"",
          ""format.class"": ""io.confluent.connect.s3.format.avro.AvroFormat"",
          ""value.converter"": ""com.insight.connect.protobuf.ProtobufConverter"",
          ""key.converter"": ""org.apache.kafka.connect.storage.StringConverter"",
          ""partitioner.class"": ""io.confluent.connect.storage.partitioner.DailyPartitioner"",
          ""locale"": ""de-CH"",
          ""timezone"": ""Europe/Zurich"",
          ""store.url"": ""http://minio-server-svc:9000/""
        }'"

And this is what I see in the log:

[2020-10-23 10:35:47,594] INFO Opening record writer for: topics/com.acp.bde.doc_cmg/year=2020/month=10/day=23/com.acp.bde.doc_cmg+1+0000000000.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
[2020-10-23 10:35:48,017] INFO Opening record writer for: topics/com.acp.bde.doc_cmg/year=2020/month=10/day=23/com.acp.bde.doc_cmg+3+0000000000.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
[2020-10-23 10:35:48,075] INFO Opening record writer for: topics/com.acp.bde.doc_cmg/year=2020/month=10/day=23/com.acp.bde.doc_cmg+2+0000000000.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
[2020-10-23 11:35:37,989] INFO [Worker clientId=connect-1, groupId=kafka-connect-01] Session key updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

So they are open for almost 1h already and nothing really happens, I'm wondering if my configuration is totally bad or there is some property and configuration that I would need so this pushing of data is a bit faster.

UPDATE: I still don't have this properly fixed but it seems actually it's a lack of memory issue.

The program just gets stuck in this line this.buffer = ByteBuffer.allocate(this.partSize);

https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/S3OutputStream.java#L85

The part that bothers me is that it does not complain at all just stays there stuck. Should it not crash with a out of memory problem? Or should the memory not get freed a bit faster? Pretty much it can stay in that call for over 3 or 4 h without any feedback.

I still think probably something is off in my configuration but I have no idea what or where I should be looking at.

Miguel Costa
  • 627
  • 1
  • 12
  • 30

1 Answers1

1

Your partitioner is time based. So this may happen due to behaviour when rotate.schedule.interval.ms parameter is absent. Take a look at the following topic https://stackoverflow.com/a/51160834/1551246