0

My Kafka Connect sink is running out of heap space. There are other threads like this: Kafka Connect running out of heap space

where the issue is just running with the default memory setting. Previously, raising it to 2g fixed my issue. However, when adding a new sink, the heap error came back. I raised Xmx to 12g, and I still get the error.

In my systemd service file, I have:

Environment="KAFKA_HEAP_OPTS=-Xms512m -Xmx12g"

I'm still getting the heap errors even with a very high Xmx setting. I also lowered my flush.size to 1000, which I thought would help. FYI, this connector is targeting 11 different Kafka topics. Does that impose unique memory demands?

How can I fix or diagnose further?

FYI, this is with Kafka 0.10.2.1 and Confluent Platform 3.2.2. Do more recent versions provide any improvements here?

java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at io.confluent.connect.s3.storage.S3OutputStream.<init>(S3OutputStream.java:67)
at io.confluent.connect.s3.storage.S3Storage.create(S3Storage.java:197)
at io.confluent.connect.s3.format.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:67)
at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:393)
at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:197)
at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:173)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2018-03-13 20:31:46,398] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:450)
[2018-03-13 20:31:46,401] ERROR Task avro-s3-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:451)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
clay
  • 18,138
  • 28
  • 107
  • 192

2 Answers2

1

Currently, the memory requirements of the S3 connector depend on the number of outstanding partitions and the s3.part.size. Try setting the latter to 5MB (the minimum allowed). The default is 25MB.

Also read here, for a more detailed explanation of sizing suggestions:

https://github.com/confluentinc/kafka-connect-storage-cloud/issues/29

  • Each topic has three partitions. There are 11 topics, actually 14 if I count other sinks. 14 * 3 * 25MB is about 1GB? 12GB should be plenty – clay Mar 14 '18 at 01:42
  • Depends on the partitioner you are using too. Partitions are the outstanding partitions to S3. There's not necessarily a 1-to-1 mapping with Kafka partitions. – Konstantine Karantasis Mar 14 '18 at 04:48
  • @clay Did changing this config solve your problem? – nish Nov 30 '20 at 16:31
-1

Firstly, I know nothing about Kafka.

However, as a general rule, when a process meets some kind of capacity limit, and you can't raise that limit, then you must throttle the process somehow. Suggest you explore the possibility of a periodic pause. Maybe a sleep for 10 milliseconds very 100 milliseconds. Something like that.

Another thing you can try is to pin your Kafka process to one specific CPU. This can sometimes have amazingly beneficial effects.

Aethelbald
  • 232
  • 1
  • 7
  • Adding a periodic pause isn't exactly possible in a vendor provided product without forking it and adding code to do so... Also, Kafka is *streaming messages*, so pausing anything is therefore preventing that "real time" use case – OneCricketeer Mar 14 '18 at 05:38
  • If you pin the process to a cpu you can pin another one to another cpu, and so on. I did this with a real-time messaging server, once. Massive improvement just by stopping the thread thrashing. Leave 25% of cpus for the OS. – Aethelbald Mar 14 '18 at 20:39