Hello I have an integration flow that splits a file line by line, transforms each line into a POJO and then inserts that POJO into a db via JDBC outbound gateway.
I want to be able to send a single email once the process of the file has completed. I currently am sending to smtpFlow channel after my jdbcOutboundGateway, however this is sending an email after every db insert.
Here is my current flow DSL
IntegrationFlow ftpFlow() {
return IntegrationFlows.from(
ftpSource(), spec -> spec.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)))
.split(splitFile())
.transform(this::transformToIndividualScore)
.handle(jdbcOutboundGateway(null))
.channel("smtpFlow")
.get();
How do I get this flow to only send one email after all files have been processed in the jdbcOutboundGateway
?
Here is my splitFile()
method
@Bean
FileSplitter splitFile() {
FileSplitter fs = new FileSplitter(true, false);
fs.setFirstLineAsHeader("IndividualScore");
return fs;
Here is my transformToIndividualScore
method
@Transformer
private IndividualScore transformToIndividualScore(String payload) {
String[] values = payload.split(",");
IndividualScore is = new IndividualScore();
is.setScorecardDate(values[0]);
is.setVnSpId(values[1]);
is.setPrimaryCat(values[2]);
is.setSecondaryCat(values[3]);
is.setScore(Integer.parseInt(values[4]));
is.setActual(values[5]);
return is;
}
with the help of @ArtemBilan
I was able to use the publishSubscribeChannel()
and chain 2 subscribe()
methods in sequence below is the new IntegrationFlow
@Bean
IntegrationFlow ftpFlow() {
return IntegrationFlows.from(
ftpSource(), spec -> spec.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)))
.publishSubscribeChannel(channel -> channel
.subscribe(
a -> a
.split(splitFile())
.transform(this::transformToIndividualScore)
.handle(jdbcMessageHandler(null)))
.subscribe(
b -> b
.transform(this::transformToSuccessEmail)
.handle(emailHandler()))
)
.get();