I'm currently working with the Kafka Connect S3 Sink Connector 3.3.1 to copy Kafka messages over to S3 and I have OutOfMemory errors when processing late data.
I know it looks like a long question, but I tried my best to make it clear and simple to understand. I highly appreciate your help.
High level info
- The connector does a simple byte to byte copy of the Kafka messages and add the length of the message at the beginning of the byte array (for decompression purposes).
- This is the role of the
CustomByteArrayFormat
class (see configs below)
- This is the role of the
- The data is partitioned and bucketed according to the
Record
timestamp- The
CustomTimeBasedPartitioner
extends theio.confluent.connect.storage.partitioner.TimeBasedPartitioner
and its sole purpose is to override thegeneratePartitionedPath
method to put the topic at the end of the path.
- The
- The total heap size of the Kafka Connect process is of 24GB (only one node)
- The connector process between 8,000 and 10,000 messages per second
- Each message has a size close to 1 KB
- The Kafka topic has 32 partitions
Context of OutOfMemory errors
- Those errors only happen when the connector has been down for several hours and has to catch up data
- When turning the connector back on, it begins to catch up but fail very quickly with OutOfMemory errors
Possible but incomplete explanation
- The
timestamp.extractor
configuration of the connector is set toRecord
when those OOM errors happen - Switching this configuration to
Wallclock
(i.e. the time of the Kafka Connect process) DO NOT throw OOM errors and all of the late data can be processed, but the late data is no longer correctly bucketed- All of the late data will be bucketed in the
YYYY/MM/dd/HH/mm/topic-name
of the time at which the connector was turn back on
- All of the late data will be bucketed in the
- So my guess is that while the connector is trying to correctly bucket the data according to the
Record
timestamp, it does too many parallel reading leading to OOM errors- The
"partition.duration.ms": "600000"
parameter make the connector bucket data in six 10 minutes paths per hour (2018/06/20/12/[00|10|20|30|40|50]
for 2018-06-20 at 12pm) - Thus, with 24h of late data, the connector would have to output data in
24h * 6 = 144
different S3 paths. - Each 10 minutes folder contains 10,000 messages/sec * 600 seconds = 6,000,000 messages for a size of 6 GB
- If it does indeed read in parallel, that would make 864GB of data going into memory
- The
- I think that I have to correctly configure a given set of parameters in order to avoid those OOM errors but I don't feel like I see the big picture
- The
"flush.size": "100000"
imply that if there is more dans 100,000 messages read, they should be committed to files (and thus free memory)- With messages of 1KB, this means committing every 100MB
- But even if there is 144 parallel readings, that would still only give a total of 14.4 GB, which is less than the 24GB of heap size available
- Is the
"flush.size"
the number of record to read per partition before committing? Or maybe per connector's task?
- The way I understand
"rotate.schedule.interval.ms": "600000"
config is that data is going to be committed every 10 minutes even when the 100,000 messages offlush.size
haven't been reached.
- The
My main question would be what are the maths allowing me to plan for memory usage given:
- the number or records per second
- the size of the records
- the number of Kafka partitions of the topics I read from
- the number of Connector tasks (if this is relevant)
- the number of buckets written to per hour (here 6 because of the
"partition.duration.ms": "600000"
config) - the maximum number of hours of late data to process
Configurations
S3 Sink Connector configurations
{
"name": "xxxxxxx",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.region": "us-east-1",
"partition.duration.ms": "600000",
"topics.dir": "xxxxx",
"flush.size": "100000",
"schema.compatibility": "NONE",
"topics": "xxxxxx,xxxxxx",
"tasks.max": "16",
"s3.part.size": "52428800",
"timezone": "UTC",
"locale": "en",
"format.class": "xxx.xxxx.xxx.CustomByteArrayFormat",
"partitioner.class": "xxx.xxxx.xxx.CustomTimeBasedPartitioner",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"name": "xxxxxxxxx",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "xxxxxxx",
"rotate.schedule.interval.ms": "600000",
"path.format": "YYYY/MM/dd/HH/mm",
"timestamp.extractor": "Record"
}
Worker configurations
bootstrap.servers=XXXXXX
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
consumer.auto.offset.reset=earliest
consumer.max.partition.fetch.bytes=2097152
consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
group.id=xxxxxxx
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
rest.advertised.host.name=XXXX
Edit:
I forgot to add an example of the errors I have:
2018-06-21 14:54:48,644] ERROR Task XXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:482)
java.lang.OutOfMemoryError: Java heap space
[2018-06-21 14:54:48,645] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:483)
[2018-06-21 14:54:48,645] ERROR Task XXXXXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:484)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)