spring-bootapache-kafkaspring-integrationspring-cloud-datafloworacle-cdc

Spring Integration Flow with Jdbc Message source which has dynamic query


I am trying to do a change data capture from oracle DB using spring cloud data flow with kafka as broker. I am using polling mechanism for this. I am polling the data base with a basic select query at regular intervals to capture any updated data. For a better fail proof system, I have persisted my last poll time in oracle DB and used it to get the data which is updated after last poll.

public MessageSource<Object> jdbcMessageSource() {
    JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
            new JdbcPollingChannelAdapter(this.dataSource, this.properties.getQuery());
    jdbcPollingChannelAdapter.setUpdateSql(this.properties.getUpdate());
    return jdbcPollingChannelAdapter;
}

@Bean
public IntegrationFlow pollingFlow() {
    IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(jdbcMessageSource(),spec -> spec.poller(Pollers.fixedDelay(3000)));
    flowBuilder.channel(this.source.output());
    flowBuilder.transform(trans,"transform");
    return flowBuilder.get();

}

My queries in application properties are as below:

query: select * from kafka_test where LAST_UPDATE_TIME >(select LAST_POLL_TIME from poll_time)

update : UPDATE poll_time SET LAST_POLL_TIME = CURRENT_TIMESTAMP

This working perfectly for me. I am able to get the CDC from the DB with this approach.

The problem I am looking over now is below:

Creating an table just to maintain the poll time is an overburden. I am looking for maintaining this last poll time in a kafka topic and retrieve that time from kafka topic when I am making the next poll.

I have modified the jdbcMessageSource method as below to try that:

public MessageSource<Object> jdbcMessageSource() {
    String query = "select * from kafka_test where LAST_UPDATE_TIME > '"+<Last poll time value read from kafka comes here>+"'";

    JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
            new JdbcPollingChannelAdapter(this.dataSource, query);
    return jdbcPollingChannelAdapter;
}

But the Spring Data Flow is instantiating the pollingFlow( ) (please see the code above) bean only once. Hence what ever the query that is run first will remain the same. I want to update the query with new poll time for each poll.

Is there a way where I can write a custom Integrationflow to have this query updated everytime I make a poll ?

I have tried out IntegrationFlowContext for that but wasn't successful.

Thanks in advance !!!


Solution

  • See Artem's answer for the mechanism for a dynamic query in the standard adapter; an alternative, however, would be to simply wrap a JdbcTemplate in a Bean and invoke it with

    IntegrationFlows.from(myPojo(), "runQuery", e -> ...)
        ...
    

    or even a simple lambda

        .from(() -> jdbcTemplate...)