4

Libraries:

  1. r2dbc-postgresql-0.8.6.RELEASE
  2. r2dbc-pool-0.8.5.RELEASE
  3. r2dbc-spi-0.8.3.RELEASE
  4. postgresql-42.2.18
  5. List item

Problem: I tried to bulk insert using R2DBC (PostgreSQL) with code as below:

@Override
public Flux<Long> test(List<User> users) {
    return Mono.from(connectionFactory.create())
    .flatMapMany(c -> Mono.from(c.beginTransaction())
        .thenMany(Flux.fromIterable(users)
        .map(u -> {
            return Flux.from(c.createStatement("INSERT INTO public.users(name, age, salary) VALUES ($1, $2, $3)").returnGeneratedValues("id")
                .bind(0, u.getName())
                .bind(1, u.getAge())
                .bind(2, u.getSalary()).execute());
        })
        .flatMap(result -> result)
        .map(result -> result.map((row, meta) -> {
            return row.get("id", Long.class);
        }))
        .flatMap(Flux::from)
        .delayUntil(r -> c.commitTransaction())
        .doFinally((st) -> c.close())));
}

The code will execute statement to insert a user to DB and then get generated user id. Above code work as expected if the list of users is lesser than or equal 255. When the list of users is greater than 255 (256~), the exception occurred as below:

[5b38a8c6-2] There was an unexpected error (type=Internal Server Error, status=500).
Cannot exchange messages because the request queue limit is exceeded
io.r2dbc.postgresql.client.ReactorNettyClient$RequestQueueException: Cannot exchange messages because the request queue limit is exceeded
    at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.lambda$addConversation$2(ReactorNettyClient.java:809)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    |_ checkpoint ⇢ Handler xwitch.org.helloworld.rest.v2.CRUDController#importUsersBatchByR2DBC() [DispatcherHandler]
    |_ checkpoint ⇢ springfox.boot.starter.autoconfigure.SwaggerUiWebFluxConfiguration$CustomWebFilter [DefaultWebFilterChain]
    |_ checkpoint ⇢ HTTP GET "/api/v2/users/import-users-batch-by-r2dbc" [ExceptionHandlingWebHandler]
Stack trace:
        at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.lambda$addConversation$2(ReactorNettyClient.java:809)
        at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:94)
        at io.r2dbc.postgresql.util.FluxDiscardOnCancel.subscribe(FluxDiscardOnCancel.java:49)
        at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8147)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:425)
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
        at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:270)
        at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:228)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:150)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onSubscribe(MonoFlatMapMany.java:245)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:164)
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8147)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:195)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
        at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.signalCached(MonoCacheTime.java:328)
        at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.onNext(MonoCacheTime.java:345)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:148)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.ignoreDone(MonoIgnoreThen.java:191)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreInner.onComplete(MonoIgnoreThen.java:248)
        at reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:212)
        at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onComplete(FluxPeekFuseable.java:940)
        at reactor.core.publisher.FluxCreate$BaseSink.complete(FluxCreate.java:439)
        at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:784)
        at reactor.core.publisher.FluxCreate$BufferAsyncSink.complete(FluxCreate.java:732)
        at reactor.core.publisher.FluxCreate$SerializedFluxSink.drainLoop(FluxCreate.java:240)
        at reactor.core.publisher.FluxCreate$SerializedFluxSink.drain(FluxCreate.java:206)
        at reactor.core.publisher.FluxCreate$SerializedFluxSink.complete(FluxCreate.java:197)
        at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.complete(ReactorNettyClient.java:719)
        at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:984)
        at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:860)
        at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:767)
        at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:118)
        at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
        at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
        at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
        at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:265)
        at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:371)
        at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:381)
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

When i try to investigate to detect what happened. I see the exception was thrown by ReactorNettyClient.java. The implementation is:

public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil, Publisher<FrontendMessage> requests, Consumer<Flux<FrontendMessage>> sender,
                                                Supplier<Boolean> isConnected) {

        return Flux.create(sink -> {

            Conversation conversation = new Conversation(takeUntil, sink);

            // ensure ordering in which conversations are added to both queues.
            synchronized (this.conversations) {
                if (this.conversations.offer(conversation)) {

                    sink.onRequest(value -> onRequest(conversation, value));

                    if (!isConnected.get()) {
                        sink.error(new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed"));
                        return;
                    }

                    Flux<FrontendMessage> requestMessages = Flux.from(requests).doOnNext(m -> {
                        if (!isConnected.get()) {
                            sink.error(new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed"));
                        }
                    });

                    sender.accept(requestMessages);
                } else {
                    sink.error(new RequestQueueException("Cannot exchange messages because the request queue limit is exceeded"));

                }
            }
        });
    }

Error when the Queue is exceeded 255 and the Queue.offer method return false. that cause the exception be threw.

Sorry, i'm not familiar with English. Please help me to figure out what happend and the solution to fix it. I want to batch insert with the number of records >100000 for each request.

thank you.

Linh Nguyen
  • 51
  • 1
  • 2
  • i suspect you are experiencing backpressure, since a database cant process that many records at once. Maybe you should try out some buffering in combination with an overflow strategy https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#onBackpressureBuffer-int-reactor.core.publisher.BufferOverflowStrategy- https://itsallbinary.com/reactor-basics-with-example-backpressure-overflow-drop-error-latest-ignore-buffer-good-for-beginners/ – Toerktumlare Feb 27 '21 at 10:29
  • i think that too, but only 255 records allowed. if over 256, the error occurs. why is that small ? – Linh Nguyen Feb 27 '21 at 14:23
  • the link is very useful, thank you so much. I will research more. – Linh Nguyen Feb 27 '21 at 14:34
  • if you give a database 256 records, it will write them, but if you give it 256 while it is already processing 256 records, then it will probably throw an exception. It is probably writing in sequential order, so if you queue up 256 or 1000000 its not going to go faster, you will just have threads needing to wait for things to be written. In your case there needs to be a conversation between the db and your application as in database writes 256 then asks for 256 more, and then asks for 256 more to be as efficient as possible. – Toerktumlare Feb 27 '21 at 14:39
  • When it asks for more, your app needs to have records buffered so that it can deliver. its probably set to 256 because its probably a good balance between cpu and memory consumption – Toerktumlare Feb 27 '21 at 14:42
  • thank you very much, i understood what happened. But for above code (test(List users)), how should it changed for the best ? because i don't know how to wait the statement.execute for other call in flatmap method. – Linh Nguyen Feb 27 '21 at 17:01

0 Answers0