postgresqlspring-webfluxspring-data-r2dbcr2dbcr2dbc-postgresql

R2DBC - PostgreSQL - Cannot exchange messages because the request queue limit is exceeded


Libraries:

  1. r2dbc-postgresql-0.8.6.RELEASE
  2. r2dbc-pool-0.8.5.RELEASE
  3. r2dbc-spi-0.8.3.RELEASE
  4. postgresql-42.2.18
  5. List item

Problem: I tried to bulk insert using R2DBC (PostgreSQL) with code as below:

@Override
public Flux<Long> test(List<User> users) {
    return Mono.from(connectionFactory.create())
    .flatMapMany(c -> Mono.from(c.beginTransaction())
        .thenMany(Flux.fromIterable(users)
        .map(u -> {
            return Flux.from(c.createStatement("INSERT INTO public.users(name, age, salary) VALUES ($1, $2, $3)").returnGeneratedValues("id")
                .bind(0, u.getName())
                .bind(1, u.getAge())
                .bind(2, u.getSalary()).execute());
        })
        .flatMap(result -> result)
        .map(result -> result.map((row, meta) -> {
            return row.get("id", Long.class);
        }))
        .flatMap(Flux::from)
        .delayUntil(r -> c.commitTransaction())
        .doFinally((st) -> c.close())));
}

The code will execute statement to insert a user to DB and then get generated user id. Above code work as expected if the list of users is lesser than or equal 255. When the list of users is greater than 255 (256~), the exception occurred as below:

[5b38a8c6-2] There was an unexpected error (type=Internal Server Error, status=500).
Cannot exchange messages because the request queue limit is exceeded
io.r2dbc.postgresql.client.ReactorNettyClient$RequestQueueException: Cannot exchange messages because the request queue limit is exceeded
    at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.lambda$addConversation$2(ReactorNettyClient.java:809)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    |_ checkpoint ⇢ Handler xwitch.org.helloworld.rest.v2.CRUDController#importUsersBatchByR2DBC() [DispatcherHandler]
    |_ checkpoint ⇢ springfox.boot.starter.autoconfigure.SwaggerUiWebFluxConfiguration$CustomWebFilter [DefaultWebFilterChain]
    |_ checkpoint ⇢ HTTP GET "/api/v2/users/import-users-batch-by-r2dbc" [ExceptionHandlingWebHandler]
Stack trace:
        at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.lambda$addConversation$2(ReactorNettyClient.java:809)
        at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:94)
        at io.r2dbc.postgresql.util.FluxDiscardOnCancel.subscribe(FluxDiscardOnCancel.java:49)
        at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8147)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:425)
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
        at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:270)
        at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:228)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:150)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onSubscribe(MonoFlatMapMany.java:245)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:164)
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8147)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:195)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
        at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.signalCached(MonoCacheTime.java:328)
        at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.onNext(MonoCacheTime.java:345)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:148)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.ignoreDone(MonoIgnoreThen.java:191)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreInner.onComplete(MonoIgnoreThen.java:248)
        at reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:212)
        at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onComplete(FluxPeekFuseable.java:940)
        at reactor.core.publisher.FluxCreate$BaseSink.complete(FluxCreate.java:439)
        at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:784)
        at reactor.core.publisher.FluxCreate$BufferAsyncSink.complete(FluxCreate.java:732)
        at reactor.core.publisher.FluxCreate$SerializedFluxSink.drainLoop(FluxCreate.java:240)
        at reactor.core.publisher.FluxCreate$SerializedFluxSink.drain(FluxCreate.java:206)
        at reactor.core.publisher.FluxCreate$SerializedFluxSink.complete(FluxCreate.java:197)
        at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.complete(ReactorNettyClient.java:719)
        at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:984)
        at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:860)
        at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:767)
        at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:118)
        at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
        at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
        at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
        at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:265)
        at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:371)
        at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:381)
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

When i try to investigate to detect what happened. I see the exception was thrown by ReactorNettyClient.java. The implementation is:

public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil, Publisher<FrontendMessage> requests, Consumer<Flux<FrontendMessage>> sender,
                                                Supplier<Boolean> isConnected) {

        return Flux.create(sink -> {

            Conversation conversation = new Conversation(takeUntil, sink);

            // ensure ordering in which conversations are added to both queues.
            synchronized (this.conversations) {
                if (this.conversations.offer(conversation)) {

                    sink.onRequest(value -> onRequest(conversation, value));

                    if (!isConnected.get()) {
                        sink.error(new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed"));
                        return;
                    }

                    Flux<FrontendMessage> requestMessages = Flux.from(requests).doOnNext(m -> {
                        if (!isConnected.get()) {
                            sink.error(new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed"));
                        }
                    });

                    sender.accept(requestMessages);
                } else {
                    sink.error(new RequestQueueException("Cannot exchange messages because the request queue limit is exceeded"));

                }
            }
        });
    }

Error when the Queue is exceeded 255 and the Queue.offer method return false. that cause the exception be threw.

Sorry, i'm not familiar with English. Please help me to figure out what happend and the solution to fix it. I want to batch insert with the number of records >100000 for each request.

thank you.


Solution

  • I encountered this exception today in a @Transactional process and was able to solve it using two ways:

    Use concatMap

    Using concatMap with a prefetch set to Integer.MAX_VALUE solved it for me, and even improved heap consumption by quite a lot.

    Keeping flatMap

    If you really want to keep flatMap, you can set -Dreactor.bufferSize.small to a higher value than 256. But this is a bit dangerous, doing tests locally I got an OutOfMemoryError when trying to handle 100,000 records at once.

    TL;DR

    Use concatMap instead and set prefetch to the max number or records you want to be able to handle.