1

We are running a simple GCP Pubsub publisher that makes use of the Java Spring library (pubSubTemplate):

pubSubTemplate.publish(topic, message);

When sending lots of messages (about a million with a small payload of 6 primitive fields) The following Error messages appears:

ERROR [bsub-publisher4] i.g.n.s.i.n.util.ResourceLeakDetector    : LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.

(BTW the message-payload is present at our DB and to avoid loading all in memory we paginate and send them out in batches of 1000. The complete process takes a little more than a minute, amazing!)

Several Spring-boot starters:

  • org.springframework.boot:spring-boot-starter-actuator
  • com.google.cloud:spring-cloud-gcp-starter-pubsub
  • com.google.cloud:spring-cloud-gcp-starter-logging
  • com.google.cloud:spring-cloud-gcp-starter-metrics

pull in, and use, the library io.grpc.netty.shaded.io.netty (and grpc.netty) and that is where the problem lays!

Our micro-service, that has the pubsub-publisher part, does not make use of the Java Reactor (mono/flux) async streams. We make use of a Tomcat (spring-boot web-starter) for our rest-services, and at our developed code there is no Netty server present!

Should we consider this error as a BUG? It is caused by The Java Garbage-collector and the Netty object-count going out of sink somehow, right?

The question is: how to solve this problem, we need some help here?

I did not find any solution browsing the net!

I did use this property

spring.netty.leak-detection=paranoid

which gave me the next error

    Recent access records: 
Created at:
    io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:403)
    io.grpc.netty.shaded.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
    io.grpc.netty.shaded.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
    io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.allocate(SslHandler.java:2246)
    io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1337)
    io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235)
    io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284)
    io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510)
    io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449)
    io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
    io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
    io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
    io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
    io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
    io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
    io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.base/java.lang.Thread.run(Thread.java:829)
2023-02-08 16:54:10.495  INFO [sub-subscriber1] o.s.i.h.s.MessagingMethodInvokerHelper   : Overriding default instance of MessageHandlerMethodFactory with provided one.
bob tang
  • 583
  • 3
  • 12
artsgard
  • 11
  • 1
  • 4

1 Answers1

0

Error log gives you a good explanation of what's the issue: somewhere in your code (might be spring code called by your application too, perhaps WebClient's with Netty?) there is ByteBuf used and it requires call to ByteBuf.release() to release memory it allocated. The way to resolve that is described in Troubleshooting buffer leaks section: https://netty.io/wiki/reference-counted-objects.html#troubleshooting-buffer-leaks I recommend reading the docs above, but in short: use this VM parameter while running your application:

-Dio.netty.leakDetection.level=paranoid

(advanced should work as well, but you need to wait bit more for the error) and if the error pops up check the stacktrace to find out where is the code that caused that. Note that in your code might be well hidden in the stack trace between Flux operators and neetty calls, so look carefully. In my case it looked like this:

16-02-2023 10:38:59.268 reactor-http-nio-3 trace: [ERROR]  ResourceLeakDetector  - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
#1:
        io.netty.handler.codec.http.DefaultHttpContent.release(DefaultHttpContent.java:92)
        io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88)
        reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:340)
        reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:358)
...
#2:
        io.netty.buffer.AdvancedLeakAwareByteBuf.nioBuffer(AdvancedLeakAwareByteBuf.java:712)
        org.springframework.core.io.buffer.NettyDataBuffer.asByteBuffer(NettyDataBuffer.java:273)
        // there it is: com....CallerClass.lambda$responseFromWebClientExchange$12(CallerClass.java:150)
        reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)
        reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:385)
        reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
...

Sample code that releases ByteBuffer and fixes the issue:

// called from webclient.exchange().flatMap(this::responseFromWebClientExchange)
private Mono<byte[]> responseFromWebClientExchange(ClientResponse response) { 
return response
        .body(BodyExtractors.toDataBuffers())
        .map(dataBuffer -> {
            ByteBuffer byteBuffer = dataBuffer.asByteBuffer();
            byte[] byteArray = new byte[byteBuffer.remaining()];
            byteBuffer.get(byteArray); // do something with the response bytes, not important for this example
            DataBufferUtils.release(dataBuffer); // RELEASE the buffer, without this call memory will leak
            return byteArray;
        })
        .reduce(Bytes::concat);
}
// called from 
webClient
  .get()
  .uri(uri)
  .exchange()
  .flatMap(this::responseFromWebClientExchange)

note that it is the DataBufferUtils.release that calls the ByteBuf.release(), if it wasn't there, the code would generate this exception from time to time, by default 1% of ByteByfs are checked against leakages, if the io.netty.leakDetection.level is changed the checks will be more frequent. You just seem to be missing a call to DataBufferUtils.release (or just ByteBuf.release).

Piotr Cierpich
  • 427
  • 2
  • 11
  • 1
    The problem is that I am not interacting with Netty directly. All these classes and methods are called by Spring-GCP-pubsub. When debug I can see Spring entering into all above! – artsgard Feb 16 '23 at 10:15
  • @artsgard isn't there a nested exception maybe? looks like it's only netty on the stack. Can you paste the LEAK error with all the nested exceptions? – Piotr Cierpich Feb 16 '23 at 13:26
  • For the moment I can not produce the error anymore (running at the cloud/ not running locally), hmmmmm! But in previous cases the above listing was all I got even with the property paranoid added! When I get some more info I let you know, thanks! – artsgard Feb 16 '23 at 15:21
  • The io.grpc:grpc-netty-shade contain classes that spring pubsub-template(-publish) uses heavily (running the the Netty server). I can see that when debugging.There is no way to hack that code with and extend and overwrite, it is the internal GCP-Spring code, right? – artsgard Feb 20 '23 at 08:46
  • But there is an other package, which we bring in ourself: io.netty:netty-codec:4.1.86.Final. This package has the same classes as the other one! If we could pubsub to make use of this package we could overwrite things and maybe repair that strange Leak error? Anyway, since the error is part of the internal untouchable GCP-Spring code, I think the Leak-error really is a bug and we should register it at Github? – artsgard Feb 20 '23 at 08:48
  • it's hard to judge whether this is a bug or not without seeing the code; I had an impression you used spring-integration (MessagingMethodInvokerHelper on stack) to configure communication with pubsub, maybe there's something missing there that does the cleanup (just guessing). After checking your code, asking question on github is also an option worth considering. Not sure if code could be replaced without seeing it, if the troublesome piece is Spring's bean or called somewhere from Spring's bean it should be possible to replace it; actually netty itself can be replaced too – Piotr Cierpich Feb 20 '23 at 09:59