spring-integrationspring-integration-dslspring-integration-ftpspring-integration-jdbc

Spring Integration Mail: Send email after all database inserts


Hello I have an integration flow that splits a file line by line, transforms each line into a POJO and then inserts that POJO into a db via JDBC outbound gateway.

I want to be able to send a single email once the process of the file has completed. I currently am sending to smtpFlow channel after my jdbcOutboundGateway, however this is sending an email after every db insert.

Here is my current flow DSL

IntegrationFlow ftpFlow() {
    return IntegrationFlows.from(
            ftpSource(), spec -> spec.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)))
            .split(splitFile())
            .transform(this::transformToIndividualScore)
            .handle(jdbcOutboundGateway(null))
            .channel("smtpFlow")
            .get();

How do I get this flow to only send one email after all files have been processed in the jdbcOutboundGateway?

Here is my splitFile() method

@Bean
FileSplitter splitFile() {
    FileSplitter fs = new FileSplitter(true, false);
    fs.setFirstLineAsHeader("IndividualScore");
    return fs;

Here is my transformToIndividualScore method

@Transformer
private IndividualScore transformToIndividualScore(String payload) {
    String[] values = payload.split(",");
    IndividualScore is = new IndividualScore();
    is.setScorecardDate(values[0]);
    is.setVnSpId(values[1]);
    is.setPrimaryCat(values[2]);
    is.setSecondaryCat(values[3]);
    is.setScore(Integer.parseInt(values[4]));
    is.setActual(values[5]);
    return is;
}

Solution

  • with the help of @ArtemBilan

    I was able to use the publishSubscribeChannel() and chain 2 subscribe() methods in sequence below is the new IntegrationFlow

     @Bean
    IntegrationFlow ftpFlow() {
        return IntegrationFlows.from(
                ftpSource(), spec -> spec.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)))
                .publishSubscribeChannel(channel -> channel
                        .subscribe(
                            a -> a
                                    .split(splitFile())
                                    .transform(this::transformToIndividualScore)
                                    .handle(jdbcMessageHandler(null)))
                        .subscribe(
                            b -> b
                                    .transform(this::transformToSuccessEmail)
                                    .handle(emailHandler()))
                )
                .get();