Libraries:
- r2dbc-postgresql-0.8.6.RELEASE
- r2dbc-pool-0.8.5.RELEASE
- r2dbc-spi-0.8.3.RELEASE
- postgresql-42.2.18
- List item
Problem: I tried to bulk insert using R2DBC (PostgreSQL) with code as below:
@Override
public Flux<Long> test(List<User> users) {
return Mono.from(connectionFactory.create())
.flatMapMany(c -> Mono.from(c.beginTransaction())
.thenMany(Flux.fromIterable(users)
.map(u -> {
return Flux.from(c.createStatement("INSERT INTO public.users(name, age, salary) VALUES ($1, $2, $3)").returnGeneratedValues("id")
.bind(0, u.getName())
.bind(1, u.getAge())
.bind(2, u.getSalary()).execute());
})
.flatMap(result -> result)
.map(result -> result.map((row, meta) -> {
return row.get("id", Long.class);
}))
.flatMap(Flux::from)
.delayUntil(r -> c.commitTransaction())
.doFinally((st) -> c.close())));
}
The code will execute statement to insert a user to DB and then get generated user id. Above code work as expected if the list of users is lesser than or equal 255. When the list of users is greater than 255 (256~), the exception occurred as below:
[5b38a8c6-2] There was an unexpected error (type=Internal Server Error, status=500).
Cannot exchange messages because the request queue limit is exceeded
io.r2dbc.postgresql.client.ReactorNettyClient$RequestQueueException: Cannot exchange messages because the request queue limit is exceeded
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.lambda$addConversation$2(ReactorNettyClient.java:809)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ Handler xwitch.org.helloworld.rest.v2.CRUDController#importUsersBatchByR2DBC() [DispatcherHandler]
|_ checkpoint ⇢ springfox.boot.starter.autoconfigure.SwaggerUiWebFluxConfiguration$CustomWebFilter [DefaultWebFilterChain]
|_ checkpoint ⇢ HTTP GET "/api/v2/users/import-users-batch-by-r2dbc" [ExceptionHandlingWebHandler]
Stack trace:
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.lambda$addConversation$2(ReactorNettyClient.java:809)
at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:94)
at io.r2dbc.postgresql.util.FluxDiscardOnCancel.subscribe(FluxDiscardOnCancel.java:49)
at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)
at reactor.core.publisher.Flux.subscribe(Flux.java:8147)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:425)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:270)
at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:228)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:150)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onSubscribe(MonoFlatMapMany.java:245)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:164)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86)
at reactor.core.publisher.Flux.subscribe(Flux.java:8147)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:195)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.signalCached(MonoCacheTime.java:328)
at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.onNext(MonoCacheTime.java:345)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:148)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.ignoreDone(MonoIgnoreThen.java:191)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreInner.onComplete(MonoIgnoreThen.java:248)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:212)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onComplete(FluxPeekFuseable.java:940)
at reactor.core.publisher.FluxCreate$BaseSink.complete(FluxCreate.java:439)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:784)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.complete(FluxCreate.java:732)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.drainLoop(FluxCreate.java:240)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.drain(FluxCreate.java:206)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.complete(FluxCreate.java:197)
at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.complete(ReactorNettyClient.java:719)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:984)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:860)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:767)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:118)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:265)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:371)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:381)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
When i try to investigate to detect what happened. I see the exception was thrown by ReactorNettyClient.java. The implementation is:
public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil, Publisher<FrontendMessage> requests, Consumer<Flux<FrontendMessage>> sender,
Supplier<Boolean> isConnected) {
return Flux.create(sink -> {
Conversation conversation = new Conversation(takeUntil, sink);
// ensure ordering in which conversations are added to both queues.
synchronized (this.conversations) {
if (this.conversations.offer(conversation)) {
sink.onRequest(value -> onRequest(conversation, value));
if (!isConnected.get()) {
sink.error(new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed"));
return;
}
Flux<FrontendMessage> requestMessages = Flux.from(requests).doOnNext(m -> {
if (!isConnected.get()) {
sink.error(new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed"));
}
});
sender.accept(requestMessages);
} else {
sink.error(new RequestQueueException("Cannot exchange messages because the request queue limit is exceeded"));
}
}
});
}
Error when the Queue is exceeded 255 and the Queue.offer method return false. that cause the exception be threw.
Sorry, i'm not familiar with English. Please help me to figure out what happend and the solution to fix it. I want to batch insert with the number of records >100000 for each request.
thank you.