springspring-bootspring-integrationproject-reactorreactor-kafka

Main processing flow programmatic approach when using Spring Integration with Project Reactor


I want to define a flow that consumes kafka with Reactor Kafka and writes to MongoDB, and only on success writes the IDs to Kafka. I'm using the Project Reactor with Spring Integration JavaDSL, and I'd wish to have a FlowBuilder class that defines my pipeline at a high level. I currently have the following direction:

public IntegrationFlow buildFlow() {
   return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
      .publishSubscribeChannel(c -> c
                        .subscribe(sf -> sf
                                .handle(MongoDb.reactiveOutboundChannelAdapter())) 
      .handle(writeToKafka)
      .get();
}

I've seen in the docs that there is a support for a different approach, that also works with Project Reactor. This approach doesn't include the use of IntegrationFlows. This looks like this:

@MessagingGateway
public static interface TestGateway {

    @Gateway(requestChannel = "promiseChannel")
    Mono<Integer> multiply(Integer value);

    }

        ...

    @ServiceActivator(inputChannel = "promiseChannel")
    public Integer multiply(Integer value) {
            return value * 2;
    }

        ...

    Flux.just("1", "2", "3", "4", "5")
            .map(Integer::parseInt)
            .flatMap(this.testGateway::multiply)
            .collectList()
            .subscribe(integers -> ...);

I'd like to know what is more of a recommended way of processing when working with these two libraries. I wonder how can I use the Reactive MongoDB adapter in the second example. I'm not sure if the second approach is even possible without an IntegrationFlows wrapper.


Solution

  • The @MessagingGateway was designed for high-level end-user API, to hide messaging underneath as much as possible. So, the target service is free from any messaging abstraction when you develop its logic.

    It is possible to use such an interface adapter from the IntegrationFlow and you should treat it as regular service activator therefore it would look like this:

    .handle("testGateway", "multiply", e -> e.async(true))
    

    The async(true) to make this service activator to subscribe to the returned Mono. You may omit this then you are on your own to subscriber to it downstream since exactly this Mono is going to be a payload for the next message in the flow.

    If you want to have something opposite: call an IntegrationFlow from the Flux, like that flatMap(), then consider to use a toReactivePublisher() operator from the flow definition to return a Publisher<?> and declare it as a bean. In this case it is better to not use that MongoDb.reactiveOutboundChannelAdapter(), but just ReactiveMongoDbStoringMessageHandler to let its returned Mono to be propagated to that Publisher.

    On the other hand if you want to have that @MessagingGateway with the Mono return, but still call from it a ReactiveMongoDbStoringMessageHandler, then declare it as a bean and mark it with that @ServiceActivator.

    We also have an ExpressionEvaluatingRequestHandlerAdvice to catch errors (or success) on the particular endpoint and handle them respectively: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#expression-advice

    I think what you are looking for is like this:

    public IntegrationFlow buildFlow() {
       return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
          .handle(reactiveMongoDbStoringMessageHandler, "handleMessage")
          .handle(writeToKafka)
          .get();
    }
    

    Pay attention to the .handle(reactiveMongoDbStoringMessageHandler) - it is not about a MongoDb.reactiveOutboundChannelAdapter(). Because this one wraps a ReactiveMessageHandler into a ReactiveMessageHandlerAdapter for automatic subscription. What you need is look more like you'd like to have that Mono<Void> returned to your own control, so you can use it as an input into your writeToKafka service and subscribe there yourself and handle success or error as you explained. The point is that with Reactive Stream we cannot provide an imperative error handling. The approach is the same like with any async API usage. So, we send errors to the errorChannel for Reactive Streams, too.

    We probably can improve that MongoDb.reactiveOutboundChannelAdapter() with something like returnMono(true/false) to let the use-case like your to be available out-of-the-box.