9

I have recently enabled SSL and tried to start Kafka connect in distributed mode. When running

connect-distributed connect-distributed.properties

I get the following errors:

[2018-10-09 16:50:57,190] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:106)
[2018-10-09 16:50:55,471] ERROR WorkerSinkTask{id=sink-mariadb-test} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
java.lang.OutOfMemoryError: Java heap space
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
        at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:344)
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:305)
        at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:560)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:496)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:314)
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

and

java.lang.OutOfMemoryError: Direct buffer memory
        at java.nio.Bits.reserveMemory(Bits.java:694)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
        at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
        at sun.nio.ch.IOUtil.read(IOUtil.java:195)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
        at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:104)
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:344)
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:305)
        at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:560)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:496)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:314)
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

I have also tried to increase max and initial heap size by setting the KAFKA_HEAP_OPTS environment variable by running

KAFKA_HEAP_OPTS="-Xms4g -Xmx6g" connect-distributed connect-distributed.properties

but still doesn't work.

My questions are:

  1. Can SSL authentication affect memory usage by any chance?
  2. How can I fix the issue?

EDIT:
I have tried to disable SSL and everything is working without any problems.

Giorgos Myrianthous
  • 36,235
  • 20
  • 134
  • 156
  • I'm running Connect with `10g`, but without actively monitoring the heap, not sure what to tell you to set it to – OneCricketeer Oct 09 '18 at 18:59
  • @cricket_007 I am running Confluent CLI on my dev environment, having about 12GB of memory in total so setting Xmx to 10g is not an option for me. – Giorgos Myrianthous Oct 09 '18 at 19:01
  • If using the Confluent CLI, then you would be running `confluent start` and `confluent load` rather than `connect-distributed` directly, no? In which case, I don't think `KAFKA_HEAP_OPTS` is applied the same way for those scripts unless you actually add an export to your `bashrc`, for example – OneCricketeer Oct 09 '18 at 21:47
  • @cricket_007 Indeed. Although I am running Confluent CLI, I'm running Kafka Connect in distributed mode. I have also exported `KAFKA_HEAP_OPTS` to my `bashrc` file but doesn't work either. – Giorgos Myrianthous Oct 10 '18 at 07:45
  • As I mentioned, it would be useful if you could enable JMX monitoring and get a heap dump when such errors occur. That's only way we can really determine what causes the problem – OneCricketeer Oct 10 '18 at 13:34

1 Answers1

10

I ran into this issue when enabling SASL_SSL in Kafka Connect :

[2018-10-12 12:33:36,426] ERROR WorkerSinkTask{id=test-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172) 
java.lang.OutOfMemoryError: Java heap space

Checking ConsumerConfig values showed me that my configuration was not applied :

[2018-10-12 12:33:35,573] INFO ConsumerConfig values: 
...
security.protocol = PLAINTEXT

I found out that you have to prefix configs with producer. or consumer. in your properties file.

consumer.security.protocol=SASL_SSL
Giorgos Myrianthous
  • 36,235
  • 20
  • 134
  • 156
Gery
  • 609
  • 4
  • 9
  • Also see my answer here: https://stackoverflow.com/a/61362427/1446479 – peedee Aug 03 '20 at 09:29
  • In my case (AWS MSK kafka - IAM auth protected), this was happening due to missing client.properties file while running kafka-console-producer.sh script. --command-config //client.properties. see here https://www.youtube.com/watch?v=r12HYxWAJLo&t=477s – Anum Sheraz Aug 02 '23 at 00:18