What I am trying to perform is to consume messages with a int-jdbc:inbound-channel-adapter linked to some datasource, and then update a consumer_table (or offsets table) that would retain, for a specific key, the exact date of query
<int-jdbc:inbound-channel-adapter
...
query="someSelect"
update="someUpdate"
update-sql-parameter-source-factory="someCustomSqlParameterSourceFactory">
</int-jdbc:inbound-channel-adapter>
group_id | latest_consultation |
---|---|
someGroupId | 2023-11-01 19h30 |
someOtherGroupId | 2023-12-04 19h30 |
id | priority | payload | creation_date |
---|---|---|---|
1 | null | [BLOB] | 2023-12-01 18:20:00.000 |
2 | 1 | [BLOB] | 2023-12-05 18:30:00.000 |
3 | 1 | [BLOB] | 2023-12-10 18:27:00.000 |
Basically, I will have 2 queries:
SELECT NOW() as now_,id,payload,priority
FROM myDb.messages m
WHERE m.creation_date >
(select o.latest_consultation from myDb.offsets o where o.group_id = 'someGroupId')
AND
REPLACE INTO myDb.offsets (group_id,latest_consultation) values ('someGroupId', :now_)
The problem here is that the :now_ expression will be evaluated as collection
It is well explained here , with a start of a solution.
But with the trick of the above link, it will point to a static method, that will not have access to the input data , and thus the now_ value (or not in a way I understand).
Admittedly, this way I can provide a timestamp computed on the run, but it may be slightly different from the now_ value. It may be very short, but with billions of messages per hour, this may occasionnaly result in some message being missed in the meantime. Thus, I need it to be now_
Thus, I am trying to find a way to pass the exact time of the select query to the update query as a single element (and not a collection)
Any (new) idea?
Edit 1 I tried Artem Bilan's solution : using update-per-row="true" I had to replace replace into with update where... because now_ was not recognized as a Timestamp, but as an object. Basically, I had :
PreparedStatementCallback; uncategorized SQLException for SQL [replace into moba.offsets (group_id,latest_consultation) values ('valdimir', ?) ]; SQL state [null]; error code [0]; Type java.lang.Object not supported type
With an update clause, combined with update-per-row="true", it does the trick. However, I compared performances with and without update-per-row, and for 3000 records, I went from 5 mins to 1,5 mins.... This is a hudge downgrade in message rate. I'd rather find a less invasive method if possible.
(though, my use case was actually not realistic : I consumed them as once, while the polling pattern will act closer to the update-per-row="true" use case)
When your SELECT
returns several rows and you intend to use a single UPDATE
for all of them, then you need to consider to use in(:now_)
clause in the UPDATE
: https://docs.spring.io/spring-integration/reference/jdbc/inbound-channel-adapter.html
The parameters in the update query are specified with a colon (:) prefix to the name of a parameter (which, in the preceding example, is an expression to be applied to each of the rows in the polled result set). This is a standard feature of the named parameter JDBC support in Spring JDBC, combined with a convention (projection onto the polled result list) adopted in Spring Integration.
The logic there is like this:
"#root.![" + paramName + "]"
which is a collection projection
: build a new collection selecting a particular property of the original collection elements: https://docs.spring.io/spring-framework/reference/core/expressions/language-ref/collection-projection.html
If you still cannot use that, there is an option like update-per-row="true"
. See the same docs about JDBC Inbound Channel Adapter.
You also may consider to use max-rows="1"
if the logic falls into a conclusion that only one item must be returned from SELECT
.
Another solution is to use stored procedure to perform everything on the DB side: https://docs.spring.io/spring-integration/reference/jdbc/stored-procedures.html