parametersapache-camelcamel-sql

How to update only consumed sql rows with Apache Camel SQL Component?


I'm new in Camel and i try to process SQL data. If the SQL consume (select) is done, i try to update the consumed rows, but i just get a "bad SQL grammar" exception.

I use the Apache Camel SQL-Component, where the starting endpoint is a sql select statement. To mark them as consumed, i use the onConsume parameter of the SQL-Component. In the select, v_table is the view of the original table t_table, which is used afterwards in the update. So the id of a row in v_table and t_table is the same. To update not all rows in t_table, i use the where condition with where id = :#id.

String sqlSelect = "select * from v_table where camel_is_read = 0";
String sqlUpdate = "update t_table set camel_is_read = 1, date_checked = sysdate where id = :#id";
from("sql:"+sqlSelect+"?dataSource=myDataSource&onConsume="+sqlUpdate)
.process(new Processor() {
    public void process(Exchange exchange) throws Exception {
        System.out.println(exchange.getIn().getBody().toString());
    }
})                    
.errorHandler(deadLetterChannel("direct:moveFailedOut").useOriginalMessage())
.bean("orderToJms")
.to(jmsURI)
.bean("validate")
.to(ftpOut);

If i execute this, i get the follwoing exception:

WARN  Error executing onConsume/onConsumeFailed query update t_table set camel_is_read = 1, date_checked = sysdate where id = :?id. Caused by: [org.springframework.jdbc.BadSqlGrammarException - PreparedStatementCallback; bad SQL grammar [update t_table set camel_is_read = 1, date_checked = sysdate where id = ?]; nested exception is java.sql.SQLSyntaxErrorException: ORA-00904: "YSDATEHERE": ungültiger Bezeichner
]
org.springframework.jdbc.BadSqlGrammarException: PreparedStatementCallback; bad SQL grammar [update t_table set camel_is_read = 1, date_checked = sysdate where id = ? exception is java.sql.SQLSyntaxErrorException: ORA-00904: "YSDATEHERE": ungültiger Bezeichner

    at org.springframework.jdbc.support.SQLErrorCodeSQLExceptionTranslator.doTranslate(SQLErrorCodeSQLExceptionTranslator.java:237)
    at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:72)
    at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:605)
    at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:617)
    at org.apache.camel.component.sql.DefaultSqlProcessingStrategy.commit(DefaultSqlProcessingStrategy.java:46)
    at org.apache.camel.component.sql.SqlConsumer.processBatch(SqlConsumer.java:195)
    at org.apache.camel.component.sql.SqlConsumer$1.doInPreparedStatement(SqlConsumer.java:118)
    at org.apache.camel.component.sql.SqlConsumer$1.doInPreparedStatement(SqlConsumer.java:91)
    at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:589)
    at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:617)
    at org.apache.camel.component.sql.SqlConsumer.poll(SqlConsumer.java:91)
    at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:174)
    at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:101)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:724)
Caused by: java.sql.SQLSyntaxErrorException: ORA-00904: "YSDATEHERE": ungültiger Bezeichner

I tried to execute the update manually in the database with a test-id (e. g. "3"), which works, so the generall SQL grammar should be fine. So it seems for me, that Camel can't replace the :#id parameter.

I added a processor, to check the outcome of the select:

{ID=3, [...] CAMEL_IS_READ=0}

Here i can see, that the select successfully catched the necessary id. I don't understand why Camel is not able to replace the :#id parameter with the id value of 3. Does anyone know how to fix this problem? I used this and this as a rough example/template. Or is this approach wrong in general?


Solution

  • Finally it works for me by surrounding the sysdate statement with the to_date() function:

    String sqlSelect = "select * from v_table where camel_is_read = 0";
    String sqlUpdate = "update t_table set camel_is_read = 1, date_checked = to_date(sysdate) where id = :#id";
    from("sql:"+sqlSelect+"?dataSource=myDataSource&onConsume="+sqlUpdate)
    .process(new Processor() {
        public void process(Exchange exchange) throws Exception {
            System.out.println(exchange.getIn().getBody().toString());
        }
    })                    
    .errorHandler(deadLetterChannel("direct:moveFailedOut").useOriginalMessage())
    .bean("orderToJms")
    .to(jmsURI)
    .bean("validate")
    .to(ftpOut);
    

    Some notes, what doesn't work / is not a good idea (in my opinion) and saves you maybe time:
    In some situations onConsumeBatchComplete could be a solution. If you know the condition, you can maybe implicate, that if all rows with this condition are passed through camel, this rows can be updated. This works together with sysdate and doesn't need the id placeholder. Disadvantage of this solution is, that it is implicate and if in the same time new rows are added, which camel didnt't pass, they will be also updated after the batch is completed. So i can't recommend this.

    It also doesn't work to set the time in Java, like this:

    String sqlUpdate = "update t_table date_checked = " + today + " where id = :#id";
    

    If you don't restart your route every day, the camel route is active over some days, weeks or years. In this case, the camel route will be generated on startup and after this, there is the "todays" date always fix with this days date. So it will always use the same date for the database update.