I am trying to do a change data capture from oracle DB using spring cloud data flow with kafka as broker. I am using polling mechanism for this. I am polling the data base with a basic select query at regular intervals to capture any updated data. For a better fail proof system, I have persisted my last poll time in oracle DB and used it to get the data which is updated after last poll.
public MessageSource<Object> jdbcMessageSource() {
JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
new JdbcPollingChannelAdapter(this.dataSource, this.properties.getQuery());
jdbcPollingChannelAdapter.setUpdateSql(this.properties.getUpdate());
return jdbcPollingChannelAdapter;
}
@Bean
public IntegrationFlow pollingFlow() {
IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(jdbcMessageSource(),spec -> spec.poller(Pollers.fixedDelay(3000)));
flowBuilder.channel(this.source.output());
flowBuilder.transform(trans,"transform");
return flowBuilder.get();
}
My queries in application properties are as below:
query: select * from kafka_test where LAST_UPDATE_TIME >(select LAST_POLL_TIME from poll_time)
update : UPDATE poll_time SET LAST_POLL_TIME = CURRENT_TIMESTAMP
This working perfectly for me. I am able to get the CDC from the DB with this approach.
The problem I am looking over now is below:
Creating an table just to maintain the poll time is an overburden. I am looking for maintaining this last poll time in a kafka topic and retrieve that time from kafka topic when I am making the next poll.
I have modified the jdbcMessageSource
method as below to try that:
public MessageSource<Object> jdbcMessageSource() {
String query = "select * from kafka_test where LAST_UPDATE_TIME > '"+<Last poll time value read from kafka comes here>+"'";
JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
new JdbcPollingChannelAdapter(this.dataSource, query);
return jdbcPollingChannelAdapter;
}
But the Spring Data Flow is instantiating the pollingFlow( ) (please see the code above) bean only once. Hence what ever the query that is run first will remain the same. I want to update the query with new poll time for each poll.
Is there a way where I can write a custom Integrationflow
to have this query updated everytime I make a poll ?
I have tried out IntegrationFlowContext
for that but wasn't successful.
Thanks in advance !!!
See Artem's answer for the mechanism for a dynamic query in the standard adapter; an alternative, however, would be to simply wrap a JdbcTemplate
in a Bean and invoke it with
IntegrationFlows.from(myPojo(), "runQuery", e -> ...)
...
or even a simple lambda
.from(() -> jdbcTemplate...)