We are running a simple GCP Pubsub publisher that makes use of the Java Spring library (pubSubTemplate):
pubSubTemplate.publish(topic, message);
When sending lots of messages (about a million with a small payload of 6 primitive fields) The following Error messages appears:
ERROR [bsub-publisher4] i.g.n.s.i.n.util.ResourceLeakDetector : LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
(BTW the message-payload is present at our DB and to avoid loading all in memory we paginate and send them out in batches of 1000. The complete process takes a little more than a minute, amazing!)
Several Spring-boot starters:
- org.springframework.boot:spring-boot-starter-actuator
- com.google.cloud:spring-cloud-gcp-starter-pubsub
- com.google.cloud:spring-cloud-gcp-starter-logging
- com.google.cloud:spring-cloud-gcp-starter-metrics
pull in, and use, the library io.grpc.netty.shaded.io.netty (and grpc.netty) and that is where the problem lays!
Our micro-service, that has the pubsub-publisher part, does not make use of the Java Reactor (mono/flux) async streams. We make use of a Tomcat (spring-boot web-starter) for our rest-services, and at our developed code there is no Netty server present!
Should we consider this error as a BUG? It is caused by The Java Garbage-collector and the Netty object-count going out of sink somehow, right?
The question is: how to solve this problem, we need some help here?
I did not find any solution browsing the net!
I did use this property
spring.netty.leak-detection=paranoid
which gave me the next error
Recent access records:
Created at:
io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:403)
io.grpc.netty.shaded.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
io.grpc.netty.shaded.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.allocate(SslHandler.java:2246)
io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1337)
io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235)
io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284)
io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510)
io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449)
io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.base/java.lang.Thread.run(Thread.java:829)
2023-02-08 16:54:10.495 INFO [sub-subscriber1] o.s.i.h.s.MessagingMethodInvokerHelper : Overriding default instance of MessageHandlerMethodFactory with provided one.