I have an issue of Spring Webflux project. First I made 2 pojo model classes, User and Post.
User.java
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Table("blog_user")
public class User {
@Id
@Column("user_id")
private Long id;
@Column
private String username;
@Column
@JsonIgnore
private String password;
}
Post.java
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Table
public class Post {
@Id
@Column("post_id")
private Long id;
@Column
private String title;
@Column
private String body;
@Column("user_id")
private User user;
}
and relative repository interfaces
public interface UserReactiveMysqlRepository extends ReactiveCrudRepository<User, Long> {
}
public interface PostReactiveMysqlRepository extends ReactiveCrudRepository<Post, Long> {
}
I made the webflux handler class like below, which try to extract user from Mono and put the user value into post class.
@Component
public class PostHandler {
@Autowired
private PostReactiveMysqlRepository postRepository;
@Autowired
private UserReactiveMysqlRepository userRepository;
public Mono<ServerResponse> findAll(ServerRequest request) {
Flux<Post> fluxPost = postRepository.findAll()
.filter(p -> (p.getUser() == null))
.map(p -> {
User u = userRepository.findById(p.getId()).block(); // This line throws Exception.
p.setUser(u);
return p;
});
return ServerResponse.ok().body(fluxPost, Post.class);
}
But the block() api line throws the error messages.
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.5.1.jar:3.5.1]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ Handler com.aaa.blog.wf.router.BlogWebFluxEndpointRouter$$Lambda$1434/0x00000008006ce410@7d628cef [DispatcherHandler]
*__checkpoint ⇢ HTTP GET "/route/post/all" [ExceptionHandlingWebHandler]
Original Stack Trace:
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.Mono.block(Mono.java:1710) ~[reactor-core-3.5.1.jar:3.5.1]
at com.aaa.blog.wf.handler.PostHandler.lambda$1(PostHandler.java:47) ~[classes/:na]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:113) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:345) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.innerNext(FluxConcatMapNoPrefetch.java:258) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:863) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxConcatMap$WeakScalarSubscription.request(FluxConcatMap.java:479) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2305) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.request(FluxConcatMapNoPrefetch.java:338) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxFilter$FilterSubscriber.request(FluxFilter.java:186) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2305) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.request(FluxConcatArray.java:276) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.5.1.jar:3.5.1]
at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.request(ChannelSendOperator.java:296) ~[spring-web-6.0.3.jar:6.0.3]
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.netty.channel.MonoSendMany$SendManyInner.onSubscribe(MonoSendMany.java:254) ~[reactor-netty-core-1.1.1.jar:1.1.1]
at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92) ~[reactor-core-3.5.1.jar:3.5.1]
at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.subscribe(ChannelSendOperator.java:358) ~[spring-web-6.0.3.jar:6.0.3]
at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:67) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.Flux.subscribe(Flux.java:8660) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.netty.channel.MonoSendMany.subscribe(MonoSendMany.java:102) ~[reactor-netty-core-1.1.1.jar:1.1.1]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:240) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.netty.FutureMono$FutureSubscription.operationComplete(FutureMono.java:196) ~[reactor-netty-core-1.1.1.jar:1.1.1]
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:583) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:559) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:717) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:272) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:352) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:421) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:895) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:921) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:907) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:893) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush(CombinedChannelDuplexHandler.java:531) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:125) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.CombinedChannelDuplexHandler.flush(CombinedChannelDuplexHandler.java:356) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:923) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:907) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:893) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:923) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:941) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1247) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
As you know the default web server of Spring WebFlux is netty. And the netty seems not to support block()
api function. Some replies advise to change the web server to tomcat, But I think it can not be the correct solution. How can I extract the user class value from Mono<User>
? Or does my code contain the wrong grammar?
Blocking code is not allowed on reactive schedulers. You need to define a flow using reactive API ‘map/flatMap‘ and return it from your controller.
From what I see PostReactiveMysqlRepository
is reactive and userRepository.findById()
returns Mono<User>
you can use
public Mono<ServerResponse> findAll(ServerRequest request) {
Flux<Post> fluxPost = postRepository.findAll()
.filter(p -> (p.getUser() == null))
.flatMap(p ->
userRepository.findById(p.getId()
.map(user -> {
p.setUser(user);
return p;
})
);
return ServerResponse.ok().body(fluxPost, Post.class);
}
The key points here are
flatMap
instead of map
in case operation is async (returns Mono
or Flux
)map
instead of block
to complete the reactive flow