springreactive-programmingintegration

How to avoid block from Reactive Service in Spring Integration Flow?


I'm learning Java Integration and I have a problem when getting data from reactive service and using it in IntegrationFlow, my code here, is simple demo of my idea:

@Autowired
    private MockReactiveService mockReactiveService; // return Mono

@Bean
    public IntegrationFlow webFluxFlow(){
        return IntegrationFlow.from(WebFlux
                        .inboundGateway("/demo")
                        .requestMapping(m -> m.methods(HttpMethod.POST))
                        .replyChannel("replyChannel")
                )
                .enrich(enricherSpec -> enricherSpec.headerFunction(IntegrationMessageHeaderAccessor.PRIORITY, message -> {
                    String priorityMessageType = mockReactiveService
                            .getMonoWrapper(message.getPayload())
                            .map(myModel -> myModel.getMessageType())
                            .block();
                    return message.getHeaders().get("MessageType",String.class).equals(priorityMessageType)?1:0;
                }))
                .channel(c -> c.priority())
                .channel("nextChannel")
                .get();
    }

How do we do that without using the block() method?

I expected the right way to combine Integration and Reactive


Solution

  • It is easier when I understand about multiple threads.