0

My S3 connector (config below) is failing due to java.lang.OutOfMemoryError.

As far as I understand from different posts (e.g. here) the memory allocation is about (s3 part size) * (#kafka partitions) * (#partitions created by the partitioner).

I'm reading from a 60 partitions topic and my partitioner can create dozens of partitions and the minimum s3.part.size is 5 MB (enforced by the connector, I would be satisfied with 5%).

Does this mean i cannot go with less than 60 * 5 * 50 = 15000 MB of heap size ? and this is just for this connector ?

Is there other configuration that can help me reduce the allocated memory ?

Connector config:

{
    "name": "test-processed-to-fake-dlk-6",
    "config": {
        "name": "test-processed-to-fake-dlk-6",
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
        "s3.region": "us-east-1",
        "topics.dir": "",
        "flush.size": "400",
        "schema.compatibility": "NONE",
        "tasks.max": "1",
        "topics": "raw-events-dev",
        "s3.part.size": "5242880",
        "timezone": "UTC",
        "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
        "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
        "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
        "value.converter": "com.naturalint.kafka.connect.storage.SafeJsonConverter",
        "storage.class": "io.confluent.connect.s3.storage.S3Storage",
        "s3.bucket.name": "eventstestkinesis",
        "rotate.schedule.interval.ms": "60000"
    }
}
  • If you have 60 partitions, you'll want to have more tasks. You should also install more connector workers in the cluster so one machine isn't doing all the work – OneCricketeer Nov 04 '20 at 16:10
  • I found that for my use case reducing the number of partitions was sufficient. I like the idea of splitting the work between multiple machines when data throughput is much larger, but this also requires the events of the same partition (partitioner, not kafka) to be processed in the same machine no ? – Oded Rosenberg Nov 05 '20 at 07:19
  • Not sure I understand the question. Each task will pull a Kafka partition, which builds up a multipart S3 request for a single file to be placed at the connector partitioner location... Regarding throughput, if you only had one task pulling data / uploading data at a time, then I would be surprised you got OOM – OneCricketeer Nov 05 '20 at 13:22
  • As I understood it: I had 1 task, collecting data from 60 partitions, partitions the consumed events into few dozens files, each with a minimum of 5M with ends with 60 * ~50 * 5 which ended in ~15000 MB of allocated memory for the files. I'm still not sure I got it correctly but reducing the number of kafka partitions seems reasonable anyway and it got things stable – Oded Rosenberg Nov 06 '20 at 11:36
  • Haven't personally tested, but I don't think it actually consumes and allocates all 60 partitions / files at once. – OneCricketeer Nov 06 '20 at 18:18

1 Answers1

1

Based on this memory allocation explanation and this explanation on number of partitions optimization article.

My topic throughput is less than 1Mb/s so 60 partitions seemed like a massive overkill.

I reduced the number of partitions to 6 and things looks stable now.