I am using the ReactiveElasticsearchClient from spring-data-elasticsearch 3.2.3 with spring-boot 2.2.0. When upgrading to spring-boot 2.2.2 i have got org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer : 262144.
It's indicated to fixe that to use spring.codec.max-in-memory-size but i still got the same exception.
Bellow the whole exception:
org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer : 262144
at org.springframework.core.io.buffer.LimitedDataBufferList.raiseLimitException(LimitedDataBufferList.java:101)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoCollect] :
reactor.core.publisher.Flux.collect(Flux.java:3273)
org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:553)
Error has been observed at the following site(s):
|_ Flux.collect ⇢ at org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:553)
|_ Mono.filter ⇢ at org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:554)
|_ Mono.map ⇢ at org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:555)
|_ Mono.map ⇢ at org.springframework.core.codec.AbstractDataBufferDecoder.decodeToMono(AbstractDataBufferDecoder.java:96)
|_ checkpoint ⇢ Body from POST http://localhost:9200/_bulk?timeout=1m [DefaultClientResponse]
|_ Mono.map ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.readResponseBody(DefaultReactiveElasticsearchClient.java:669)
|_ Mono.doOnNext ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.readResponseBody(DefaultReactiveElasticsearchClient.java:670)
|_ Mono.flatMap ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.readResponseBody(DefaultReactiveElasticsearchClient.java:671)
|_ Mono.flatMapMany ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.sendRequest(DefaultReactiveElasticsearchClient.java:591)
|_ Flux.publishNext ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.bulk(DefaultReactiveElasticsearchClient.java:448)
|_ Flux.flatMap ⇢ at com.energisme.ds.reactive.aggregation.service.SensorAggregationService.save(SensorAggregationService.java:32)
|_ Flux.map ⇢ at com.energisme.ds.reactive.aggregation.service.SensorAggregationService.save(SensorAggregationService.java:33)
|_ Flux.reduce ⇢ at com.energisme.ds.reactive.aggregation.service.SensorAggregationService.save(SensorAggregationService.java:34)
|_ Mono.zip ⇢ at com.energisme.ds.reactive.aggregation.service.AggregateSensorFlowService.nonIndexDifferenceAggregateSensorData(AggregateSensorFlowService.java:178)
|_ Mono.map ⇢ at com.energisme.ds.reactive.aggregation.service.AggregateSensorFlowService.nonIndexDifferenceAggregateSensorData(AggregateSensorFlowService.java:179)
Stack trace:
at org.springframework.core.io.buffer.LimitedDataBufferList.raiseLimitException(LimitedDataBufferList.java:101)
at org.springframework.core.io.buffer.LimitedDataBufferList.updateCount(LimitedDataBufferList.java:94)
at org.springframework.core.io.buffer.LimitedDataBufferList.add(LimitedDataBufferList.java:59)
at reactor.core.publisher.MonoCollect$CollectSubscriber.onNext(MonoCollect.java:119)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:571)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:89)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:313)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:427)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:281)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:502)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407)
at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
Can anyone tell me what i am doing wrong or is that a bug?
Thank you