I'm learning Java Integration and I have a problem when getting data from reactive service and using it in IntegrationFlow
, my code here, is simple demo of my idea:
@Autowired
private MockReactiveService mockReactiveService; // return Mono
@Bean
public IntegrationFlow webFluxFlow(){
return IntegrationFlow.from(WebFlux
.inboundGateway("/demo")
.requestMapping(m -> m.methods(HttpMethod.POST))
.replyChannel("replyChannel")
)
.enrich(enricherSpec -> enricherSpec.headerFunction(IntegrationMessageHeaderAccessor.PRIORITY, message -> {
String priorityMessageType = mockReactiveService
.getMonoWrapper(message.getPayload())
.map(myModel -> myModel.getMessageType())
.block();
return message.getHeaders().get("MessageType",String.class).equals(priorityMessageType)?1:0;
}))
.channel(c -> c.priority())
.channel("nextChannel")
.get();
}
How do we do that without using the block()
method?
I expected the right way to combine Integration and Reactive
It is easier when I understand about multiple threads.