spring-integrationspring-integration-dsl

Spring Integration flow not invoked from handle


I have the following 2 components which should delete first a document from Mongo and afterwards from Elastic.

Main flow:

@Component
public class DeleteDocumentFlow {

    @Autowired
    private StoreInMongoFlow storeInMongoFlow;

    @Bean
    public IntegrationFlow deleteDocument() {
        return IntegrationFlows.from(Channels.METADATA_DELETE_STATUS.name())
                .handle(storeInMongoFlow.deleteDocumentInMongo())
                .channel("deleteDocumentInES.input")
                .get();
    }
}

Service:

@Component
public class StoreInMongoFlow {
    @Bean
    public IntegrationFlow deleteDocumentInMongo() {
        return flow -> flow.
                <Metadata>handle((p, h) -> {
                    DBObject obj = BasicDBObjectBuilder.start("i", p.getId()).get();
                    DeleteResult documentEntry = this.mongoTemplate.remove(obj, "docs");
                    return documentEntry.getDeletedCount();
                })
                .log(LoggingHandler.Level.INFO, m -> "Number of documents deleted: " + m.getPayload());
    }
}

Unfortunately the deleteDocumentInMongo is never invoked. The bean is properly registered as I can see it in the logs.

Am I doing something fundamentally wrong or would you need some more debugging info? If I wiretap the handle, then the deleteDocumentInES.input is executed but the mongo flow is simply ignored.


Solution

  • You definitely do something fundamentally wrong. You try to treat an IntegrationFlow as a service to call from the handle(). This is no for what an IntegrationFlow has been designed. See docs for more info: https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl

    The DSL provides an IntegrationFlow component to define a composition of channels and endpoints between them, but now IntegrationFlow plays only the configuration role to populate real beans in the application context and is not used at runtime.

    if you declare a logic as a separate IntegrationFlow, you don't need to worry about that handle() - just use channel("deleteDocumentInMongo.input") to send a message from point of main flow to a first channel of that MongoDB sub-flow.

    If you want to do the same operation with Elastic, you should think about having a PublishSubscribeChannel to send a message and two flows starting from this channel.

    Since you end deleteDocumentInMongo flow with log() you can't get any reply back and your .channel("deleteDocumentInES.input") is not going to be reachable.

    Please, read more docs to understand what is pub-sub, request-reply, service activator, and flow per se.

    UPDATE

    The code might look like this:

    @Bean
    public IntegrationFlow deleteDocument() {
        return 
            IntegrationFlows.from(Channels.METADATA_DELETE_STATUS.name())
                .publishSubscribeChannel(c -> c
                            .subscribe(storeInMongoFlow.deleteDocumentInMongo())
                            .subscribe(subFlow -> subFlow.channel("deleteDocumentInES.input")))
                .get();
    }
    

    }

    If you are not interested in the replies from deleteDocumentInMongo flow it must end with nullChannel() after that log(). However, according to the publishSubscribeChannel, you must not be interested in the reply. Therefore nullChannel() must have in the end of that flow.