24

I am using the ReactiveElasticsearchClient from spring-data-elasticsearch 3.2.3 with spring-boot 2.2.0. When upgrading to spring-boot 2.2.2 i have got org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer : 262144.

It's indicated to fixe that to use spring.codec.max-in-memory-size but i still got the same exception.

Bellow the whole exception:

org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer : 262144
    at org.springframework.core.io.buffer.LimitedDataBufferList.raiseLimitException(LimitedDataBufferList.java:101)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoCollect] :
    reactor.core.publisher.Flux.collect(Flux.java:3273)
    org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:553)
Error has been observed at the following site(s):
    |_     Flux.collect ⇢ at org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:553)
    |_      Mono.filter ⇢ at org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:554)
    |_         Mono.map ⇢ at org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:555)
    |_         Mono.map ⇢ at org.springframework.core.codec.AbstractDataBufferDecoder.decodeToMono(AbstractDataBufferDecoder.java:96)
    |_       checkpoint ⇢ Body from POST http://localhost:9200/_bulk?timeout=1m [DefaultClientResponse]
    |_         Mono.map ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.readResponseBody(DefaultReactiveElasticsearchClient.java:669)
    |_    Mono.doOnNext ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.readResponseBody(DefaultReactiveElasticsearchClient.java:670)
    |_     Mono.flatMap ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.readResponseBody(DefaultReactiveElasticsearchClient.java:671)
    |_ Mono.flatMapMany ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.sendRequest(DefaultReactiveElasticsearchClient.java:591)
    |_ Flux.publishNext ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.bulk(DefaultReactiveElasticsearchClient.java:448)
    |_     Flux.flatMap ⇢ at com.energisme.ds.reactive.aggregation.service.SensorAggregationService.save(SensorAggregationService.java:32)
    |_         Flux.map ⇢ at com.energisme.ds.reactive.aggregation.service.SensorAggregationService.save(SensorAggregationService.java:33)
    |_      Flux.reduce ⇢ at com.energisme.ds.reactive.aggregation.service.SensorAggregationService.save(SensorAggregationService.java:34)
    |_         Mono.zip ⇢ at com.energisme.ds.reactive.aggregation.service.AggregateSensorFlowService.nonIndexDifferenceAggregateSensorData(AggregateSensorFlowService.java:178)
    |_         Mono.map ⇢ at com.energisme.ds.reactive.aggregation.service.AggregateSensorFlowService.nonIndexDifferenceAggregateSensorData(AggregateSensorFlowService.java:179)
Stack trace:
        at org.springframework.core.io.buffer.LimitedDataBufferList.raiseLimitException(LimitedDataBufferList.java:101)
        at org.springframework.core.io.buffer.LimitedDataBufferList.updateCount(LimitedDataBufferList.java:94)
        at org.springframework.core.io.buffer.LimitedDataBufferList.add(LimitedDataBufferList.java:59)
        at reactor.core.publisher.MonoCollect$CollectSubscriber.onNext(MonoCollect.java:119)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
        at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218)
        at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351)
        at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
        at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:571)
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:89)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:313)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:427)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:281)
        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:502)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407)
        at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:834)

Can anyone tell me what i am doing wrong or is that a bug?

Thank you

farid
  • 313
  • 1
  • 2
  • 7
  • 1
    Hey Farid, I had a bit different problem: I have a spring web flux HTTP server which I upgraded to 2.2.2, then I started to get the same exception (Exceeded limit on max bytes to buffer : 262144) when clients made HTTP POST request with a large body.. Configuring the spring.codec.max-in-memory-size property (in application.properties) to a large value (5242880 which are 5 MB) solved the problem. Did you make the same change? – Haimke Dec 17 '19 at 13:40
  • 1
    Hey Haimke, yes i had make the change and still get the same probleme – farid Dec 18 '19 at 20:28

6 Answers6

55

Using the plain reaction WebClient I ran into the same issue (going from 2.1.9 to 2.2.1.) I had no luck setting spring.codec.max-in-memory-size and later found a hint that this wasn't the way to go anyway:

… On the client side, the limit can be changed in WebClient.Builder.

(source, including dead link :-S )

I still haven't found out where WebClient.Builder gets the default 256K limit1. However, the following enabled me to raise the buffer size limit to 16M:

WebClient.builder()
  .…
  .exchangeStrategies(ExchangeStrategies.builder()
    .codecs(configurer -> configurer
      .defaultCodecs()
      .maxInMemorySize(16 * 1024 * 1024))
    .build())
  .build();

So, it seems to me (without knowing the intricacies of spring-data-elasticsearch) that if you can somehow get your hands on the WebClient as returned from the WebClientProvider you should be able to mutate it to include the ExchangeStrategies from above.

Perhaps you can provide your own override of DefaultWebClientProvider along the lines of (absolutely untested!):

class MyDefaultWebClientProvider extends DefaultWebClientProvider {
  @Override
  public WebClient get(InetSocketAddress endpoint) {
    return super.get(endpoint)
      .mutate() // Obtain WebClient.Builder instance.
      .exchangeStrategies(ExchangeStrategies.builder()
        .codecs(configurer -> configurer
          .defaultCodecs()
          .maxInMemorySize(16 * 1024 * 1024))
        .build())
      .build();
  }
}

YMMV.


UPDATE #1:

1) Now I found it. And it explains why setting spring.codec.max-in-memory-size has no effect; the property is hardcoded at 256K in the base class uses by all default codecs, cf. BaseDefaultCodecs.

jensgram
  • 31,109
  • 6
  • 81
  • 98
  • Yes u are right, that's what i have found and i had no idea how to fix that – farid Dec 18 '19 at 13:23
  • It is hardcoded but overridden by the config. "Limit on the number of bytes that can be buffered whenever the input stream needs to be aggregated. By default this is not set, in which case individual codec defaults apply. Most codecs are limited to 256K by default." https://docs.spring.io/spring-boot/docs/current/reference/html/appendix-application-properties.html – amertkara May 07 '20 at 20:32
  • Yes, the solution for `WebClient.Builder` helped. Thanks a lot! – RichArt Jun 11 '20 at 10:42
  • The above solution doesn't solve my issue, still getting org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer : 262144 – Abdul Basith Aug 19 '20 at 09:38
  • I am having @Bean public WebClient webClient(ReactorClientHttpConnector reactorClientHttpConnector) { return WebClient.builder().defaultHeader("accept-encoding", "gzip, deflate") .exchangeStrategies(ExchangeStrategies.builder() .codecs(codecConfigurer -> { ClientCodecConfigurer.ClientDefaultCodecs clientDefaultCodecs = codecConfigurer.defaultCodecs(); clientDefaultCodecs.maxInMemorySize(1024 * 1024 * 10); clientDefaultCodecs.enableLoggingRequestDetails(true); }) .build()) .clientConnector(reactorClientHttpConnector).build(); } – Abdul Basith Aug 19 '20 at 09:43
  • @jensgram, could you help pls? – Abdul Basith Aug 19 '20 at 09:51
  • 1
    This fixed my issue. the properties file setting straight up didn't work in my batch job. The Codec setting did fix it. – Brent Thoenen Aug 03 '21 at 13:33
6

A couple of days ago I implemented the possibility to customize the WebClient, check the corresponding Jira issue. This will be available in Spring Data Elasticsearch 3.2.4 and is already in the current master branch.

Configuration code looks like this:

@Configuration
public class ReactiveRestClientConfig extends AbstractReactiveElasticsearchConfiguration {
    @Override
    public ReactiveElasticsearchClient reactiveElasticsearchClient() {
        final ClientConfiguration clientConfiguration = ClientConfiguration.builder() //
                .connectedTo("localhost:9200") //
                .withWebClientConfigurer(webClient -> {
                    ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
                            .codecs(configurer -> configurer.defaultCodecs()
                                    .maxInMemorySize(-1))
                            .build();
                    return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
                })
                .build();
        return ReactiveRestClients.create(clientConfiguration);

    }
}
P.J.Meisch
  • 18,013
  • 6
  • 50
  • 66
5

As of Spring Boot 2.3.0, there is now a dedicated configuration property for the Reactive Elasticsearch REST client.

You can use the following configuration property to set a specific memory limit for the client.

spring.data.elasticsearch.client.reactive.max-in-memory-size=

The already existing spring.codec.max-in-memory-size property is separate and only affects other WebClient instances in the application.

Brian Clozel
  • 56,583
  • 15
  • 167
  • 176
  • I test using 2.5 version. The already existing spring.codec.max-in-memory-size does not work. Using the builder configuration only works. Any idea ? – Amir Choubani Jan 19 '22 at 15:44
  • the reason only the builder configuration works is probably because the setting propagates itself by injecting Webclient.Builder. its a prototype scope, not a singleton scope. so if you are using WebClient.builder() method yourself, you wont get the common settings applied. but if you inject WebClient.Builder from DI container, then it will work. – Dave Ankin Jul 03 '23 at 08:33
4

or:

    final Consumer<ClientCodecConfigurer> consumer = configurer -> {
        final ClientCodecConfigurer.ClientDefaultCodecs codecs = configurer.defaultCodecs();
        codecs.maxInMemorySize(maxBufferMb * 1024 * 1024);
    };

    WebClient.builder().codecs(consumer).build();
user1016765
  • 2,935
  • 2
  • 32
  • 48
  • 1
    Neat an concise solution. I think you could event do it this way : return WebClient.builder() .codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(maxBufferMb * 1024 * 1024)) .build(); – Akah Feb 05 '21 at 07:22
0

In my case, using Spring boot 2.5.6, I had to use both to solve the issue

Created a configuration class;

@Configuration
public class WebfluxConfig implements WebFluxConfigurer {

    @Override
    public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {
        configurer.defaultCodecs().maxInMemorySize(5000 * 1024);
    }
    @Bean("webClient")
    public WebClient getSelfWebClient(WebClient.Builder builder) {
        return builder.baseUrl("url").build();
    }
}

in the .properties file;

spring.codec.max-in-memory-size=5MB

Class where I use the WebClient;

@Autowired
@Qualifier("webClient")
private WebClient webClient;

private void doSomething() {
   String response = webClient.post()
                .uri(uri)
                .accept(MediaType.APPLICATION_JSON)
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(requestJson)
                .retrieve()
                .bodyToMono(String.class).block();
}
Cugomastik
  • 911
  • 13
  • 22
0

.withWebClientConfigurer is deprecated. Had to use .withClientConfigurer, which worked for me. Below is the code -

.withClientConfigurer(
                        ReactiveRestClients.WebClientConfigurationCallback.from(webClient -> {
                                                ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
                            .codecs(configurer -> configurer.defaultCodecs()
                                    .maxInMemorySize(-1))
                            .build();
                    return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
                        }))

Referernce

ollaw
  • 2,086
  • 1
  • 20
  • 33
Kappa
  • 1
  • 1