sqlspring-integrationspring-jdbcjdbctemplate

Update query with time of select query through a int-jdbc:inbound-channel-adapter


What I am trying to perform is to consume messages with a int-jdbc:inbound-channel-adapter linked to some datasource, and then update a consumer_table (or offsets table) that would retain, for a specific key, the exact date of query

<int-jdbc:inbound-channel-adapter
...
    query="someSelect" 
    update="someUpdate" 
    update-sql-parameter-source-factory="someCustomSqlParameterSourceFactory">
</int-jdbc:inbound-channel-adapter>
group_id latest_consultation
someGroupId 2023-11-01 19h30
someOtherGroupId 2023-12-04 19h30
id priority payload creation_date
1 null [BLOB] 2023-12-01 18:20:00.000
2 1 [BLOB] 2023-12-05 18:30:00.000
3 1 [BLOB] 2023-12-10 18:27:00.000

Basically, I will have 2 queries:

SELECT NOW() as now_,id,payload,priority 
  FROM myDb.messages m
  WHERE m.creation_date > 
     (select o.latest_consultation from myDb.offsets o where o.group_id = 'someGroupId')

AND

REPLACE INTO myDb.offsets (group_id,latest_consultation) values ('someGroupId', :now_)

The problem here is that the :now_ expression will be evaluated as collection
It is well explained here , with a start of a solution.
But with the trick of the above link, it will point to a static method, that will not have access to the input data , and thus the now_ value (or not in a way I understand). Admittedly, this way I can provide a timestamp computed on the run, but it may be slightly different from the now_ value. It may be very short, but with billions of messages per hour, this may occasionnaly result in some message being missed in the meantime. Thus, I need it to be now_

Thus, I am trying to find a way to pass the exact time of the select query to the update query as a single element (and not a collection)

Any (new) idea?

Edit 1 I tried Artem Bilan's solution : using update-per-row="true" I had to replace replace into with update where... because now_ was not recognized as a Timestamp, but as an object. Basically, I had :

PreparedStatementCallback; uncategorized SQLException for SQL [replace into moba.offsets (group_id,latest_consultation) values ('valdimir', ?) ]; SQL state [null]; error code [0]; Type java.lang.Object not supported type

With an update clause, combined with update-per-row="true", it does the trick. However, I compared performances with and without update-per-row, and for 3000 records, I went from 5 mins to 1,5 mins.... This is a hudge downgrade in message rate. I'd rather find a less invasive method if possible.

(though, my use case was actually not realistic : I consumed them as once, while the polling pattern will act closer to the update-per-row="true" use case)


Solution

  • When your SELECT returns several rows and you intend to use a single UPDATE for all of them, then you need to consider to use in(:now_) clause in the UPDATE: https://docs.spring.io/spring-integration/reference/jdbc/inbound-channel-adapter.html

    The parameters in the update query are specified with a colon (:) prefix to the name of a parameter (which, in the preceding example, is an expression to be applied to each of the rows in the polled result set). This is a standard feature of the named parameter JDBC support in Spring JDBC, combined with a convention (projection onto the polled result list) adopted in Spring Integration.

    The logic there is like this:

    "#root.![" + paramName + "]"
    

    which is a collection projection: build a new collection selecting a particular property of the original collection elements: https://docs.spring.io/spring-framework/reference/core/expressions/language-ref/collection-projection.html

    If you still cannot use that, there is an option like update-per-row="true". See the same docs about JDBC Inbound Channel Adapter.

    You also may consider to use max-rows="1" if the logic falls into a conclusion that only one item must be returned from SELECT.

    Another solution is to use stored procedure to perform everything on the DB side: https://docs.spring.io/spring-integration/reference/jdbc/stored-procedures.html