Libraries:
Problem: I tried to bulk insert using R2DBC (PostgreSQL) with code as below:
@Override
public Flux<Long> test(List<User> users) {
return Mono.from(connectionFactory.create())
.flatMapMany(c -> Mono.from(c.beginTransaction())
.thenMany(Flux.fromIterable(users)
.map(u -> {
return Flux.from(c.createStatement("INSERT INTO public.users(name, age, salary) VALUES ($1, $2, $3)").returnGeneratedValues("id")
.bind(0, u.getName())
.bind(1, u.getAge())
.bind(2, u.getSalary()).execute());
})
.flatMap(result -> result)
.map(result -> result.map((row, meta) -> {
return row.get("id", Long.class);
}))
.flatMap(Flux::from)
.delayUntil(r -> c.commitTransaction())
.doFinally((st) -> c.close())));
}
The code will execute statement to insert a user to DB and then get generated user id. Above code work as expected if the list of users is lesser than or equal 255. When the list of users is greater than 255 (256~), the exception occurred as below:
[5b38a8c6-2] There was an unexpected error (type=Internal Server Error, status=500).
Cannot exchange messages because the request queue limit is exceeded
io.r2dbc.postgresql.client.ReactorNettyClient$RequestQueueException: Cannot exchange messages because the request queue limit is exceeded
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.lambda$addConversation$2(ReactorNettyClient.java:809)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ Handler xwitch.org.helloworld.rest.v2.CRUDController#importUsersBatchByR2DBC() [DispatcherHandler]
|_ checkpoint ⇢ springfox.boot.starter.autoconfigure.SwaggerUiWebFluxConfiguration$CustomWebFilter [DefaultWebFilterChain]
|_ checkpoint ⇢ HTTP GET "/api/v2/users/import-users-batch-by-r2dbc" [ExceptionHandlingWebHandler]
Stack trace:
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.lambda$addConversation$2(ReactorNettyClient.java:809)
at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:94)
at io.r2dbc.postgresql.util.FluxDiscardOnCancel.subscribe(FluxDiscardOnCancel.java:49)
at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)
at reactor.core.publisher.Flux.subscribe(Flux.java:8147)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:425)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:270)
at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:228)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:150)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onSubscribe(MonoFlatMapMany.java:245)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:164)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86)
at reactor.core.publisher.Flux.subscribe(Flux.java:8147)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:195)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.signalCached(MonoCacheTime.java:328)
at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.onNext(MonoCacheTime.java:345)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:148)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.ignoreDone(MonoIgnoreThen.java:191)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreInner.onComplete(MonoIgnoreThen.java:248)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:212)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onComplete(FluxPeekFuseable.java:940)
at reactor.core.publisher.FluxCreate$BaseSink.complete(FluxCreate.java:439)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:784)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.complete(FluxCreate.java:732)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.drainLoop(FluxCreate.java:240)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.drain(FluxCreate.java:206)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.complete(FluxCreate.java:197)
at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.complete(ReactorNettyClient.java:719)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:984)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:860)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:767)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:118)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:265)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:371)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:381)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
When i try to investigate to detect what happened. I see the exception was thrown by ReactorNettyClient.java. The implementation is:
public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil, Publisher<FrontendMessage> requests, Consumer<Flux<FrontendMessage>> sender,
Supplier<Boolean> isConnected) {
return Flux.create(sink -> {
Conversation conversation = new Conversation(takeUntil, sink);
// ensure ordering in which conversations are added to both queues.
synchronized (this.conversations) {
if (this.conversations.offer(conversation)) {
sink.onRequest(value -> onRequest(conversation, value));
if (!isConnected.get()) {
sink.error(new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed"));
return;
}
Flux<FrontendMessage> requestMessages = Flux.from(requests).doOnNext(m -> {
if (!isConnected.get()) {
sink.error(new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed"));
}
});
sender.accept(requestMessages);
} else {
sink.error(new RequestQueueException("Cannot exchange messages because the request queue limit is exceeded"));
}
}
});
}
Error when the Queue is exceeded 255 and the Queue.offer method return false. that cause the exception be threw.
Sorry, i'm not familiar with English. Please help me to figure out what happend and the solution to fix it. I want to batch insert with the number of records >100000 for each request.
thank you.
I encountered this exception today in a @Transactional
process and was able to solve it using two ways:
concatMap
Using concatMap
with a prefetch
set to Integer.MAX_VALUE
solved it for me, and even improved heap consumption by quite a lot.
flatMap
If you really want to keep flatMap
, you can set -Dreactor.bufferSize.small
to a higher value than 256
. But this is a bit dangerous, doing tests locally I got an OutOfMemoryError
when trying to handle 100,000 records at once.
Use concatMap
instead and set prefetch
to the max number or records you want to be able to handle.