I want to define a flow that consumes kafka with Reactor Kafka and writes to MongoDB, and only on success writes the IDs to Kafka. I'm using the Project Reactor with Spring Integration JavaDSL, and I'd wish to have a FlowBuilder
class that defines my pipeline at a high level. I currently have the following direction:
public IntegrationFlow buildFlow() {
return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
.publishSubscribeChannel(c -> c
.subscribe(sf -> sf
.handle(MongoDb.reactiveOutboundChannelAdapter()))
.handle(writeToKafka)
.get();
}
I've seen in the docs that there is a support for a different approach, that also works with Project Reactor. This approach doesn't include the use of IntegrationFlows
. This looks like this:
@MessagingGateway
public static interface TestGateway {
@Gateway(requestChannel = "promiseChannel")
Mono<Integer> multiply(Integer value);
}
...
@ServiceActivator(inputChannel = "promiseChannel")
public Integer multiply(Integer value) {
return value * 2;
}
...
Flux.just("1", "2", "3", "4", "5")
.map(Integer::parseInt)
.flatMap(this.testGateway::multiply)
.collectList()
.subscribe(integers -> ...);
I'd like to know what is more of a recommended way of processing when working with these two libraries. I wonder how can I use the Reactive MongoDB adapter in the second example. I'm not sure if the second approach is even possible without an IntegrationFlows
wrapper.
The @MessagingGateway
was designed for high-level end-user API, to hide messaging underneath as much as possible. So, the target service is free from any messaging abstraction when you develop its logic.
It is possible to use such an interface adapter from the IntegrationFlow
and you should treat it as regular service activator therefore it would look like this:
.handle("testGateway", "multiply", e -> e.async(true))
The async(true)
to make this service activator to subscribe to the returned Mono
. You may omit this then you are on your own to subscriber to it downstream since exactly this Mono
is going to be a payload
for the next message in the flow.
If you want to have something opposite: call an IntegrationFlow
from the Flux
, like that flatMap()
, then consider to use a toReactivePublisher()
operator from the flow definition to return a Publisher<?>
and declare it as a bean. In this case it is better to not use that MongoDb.reactiveOutboundChannelAdapter()
, but just ReactiveMongoDbStoringMessageHandler
to let its returned Mono
to be propagated to that Publisher
.
On the other hand if you want to have that @MessagingGateway
with the Mono
return, but still call from it a ReactiveMongoDbStoringMessageHandler
, then declare it as a bean and mark it with that @ServiceActivator
.
We also have an ExpressionEvaluatingRequestHandlerAdvice
to catch errors (or success) on the particular endpoint and handle them respectively: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#expression-advice
I think what you are looking for is like this:
public IntegrationFlow buildFlow() {
return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
.handle(reactiveMongoDbStoringMessageHandler, "handleMessage")
.handle(writeToKafka)
.get();
}
Pay attention to the .handle(reactiveMongoDbStoringMessageHandler)
- it is not about a MongoDb.reactiveOutboundChannelAdapter()
. Because this one wraps a ReactiveMessageHandler
into a ReactiveMessageHandlerAdapter
for automatic subscription. What you need is look more like you'd like to have that Mono<Void>
returned to your own control, so you can use it as an input into your writeToKafka
service and subscribe there yourself and handle success or error as you explained. The point is that with Reactive Stream we cannot provide an imperative error handling. The approach is the same like with any async API usage. So, we send errors to the errorChannel
for Reactive Streams, too.
We probably can improve that MongoDb.reactiveOutboundChannelAdapter()
with something like returnMono(true/false)
to let the use-case like your to be available out-of-the-box.