I am observing very strange issue I am using logstash + jdbc to load data from Oracle db to Elasticsearch Below is how my config file looks like
input{
jdbc{
clean_run => "false"
jdbc_driver_library => "<path_to_ojdbc8-12.1.0.jar>"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_connection_string => "<connection_string>"
jdbc_user => "<usename>"
jdbc_password_filepath => ".\pwd.txt"
statement=> "SELECT * FROM customers WHERE CUSTOMER_NAME LIKE 'PE%' AND UPD_DATE > :sql_last_value "
schedule=>"*/1 * * * * "
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "upd_date"
last_run_metadata_path =>"<path to logstash_metadata>"
record_last_run => true
}
}
filter {
mutate {
copy => { "id" => "[@metadata][_id]"}
remove_field => ["@version","@timestamp"]
}
}
output {
elasticsearch{
hosts => ["<host>"]
index => "<index_name>"
document_id=>"%{[@metadata][_id]}"
user => "<user>"
password => "<pwd>"
}
stdout{
codec => dots
}
}
Now , i am triggering this file every minute on today that is March 8th 2021. when i load for first-time , its all good -:sql_last_value is '1970-01-01 00:00:00.000000 +00:00'
But after this first load , ideally logstash_metadata should be showing '2021-03-08 <HH:MM:ss>' But strangely it is getting update as --- 2020-09-11 01:05:09.000000000 Z in logstash_metadata (:sql_last_value)
As you can see the difference is near about 180 days
I tried multiple times but still it is updating in the same way , Due to this my incremental load is getting screwed
My logstash Version is 7.10.2
Help is much appreciated!
NOTE: I am not using pagination as the number of results in the resultset are always very low in number for my query
The recorded date is the date of the last processed row.
Seeing your query , you don't have a specific order for the records read from DB. Logstash jdbc input plugin encloses your query to one that orders rows by [1], 1 being the ordinal of the column it orders by.
So to process records in a correct order and get the latest upd_date value you need have upd_date be the first column in the select statement.
input{
jdbc{
clean_run => "false"
jdbc_driver_library => "<path_to_ojdbc8-12.1.0.jar>"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_connection_string => "<connection_string>"
jdbc_user => "<usename>"
jdbc_password_filepath => ".\pwd.txt"
statement=> "SELECT c.UPD_DATE, c.CUSTOMER_NAME, c.<Other field>
FROM customers c
WHERE c.CUSTOMER_NAME LIKE 'PE%' AND c.UPD_DATE > :sql_last_value
ORDER BY c.UPD_DATE ASC"
schedule=>"*/1 * * * * "
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "upd_date"
last_run_metadata_path =>"<path to logstash_metadata>"
record_last_run => true
}
}
Also note that this approach will exhaust the table the first time logstash runs, even if you set up jdbc_page_size. If you want this, that's fine.
But if you want logstash to run one batch of X rows every minute and stop until the next execution, then you must use a combination of jdbc_page_size and query with limits to make logstash retrieve exactly the amount of records you want, in the correct order. In SQL Server it work like that:
input{
jdbc{
jdbc_driver_library => ...
jdbc_driver_class => ...
jdbc_connection_string => ...
jdbc_user => ...
jdbc_password_filepath => ...
statement=> "SELECT TOP 10000 c.UPD_DATE, c.CUSTOMER_NAME
FROM customers c
WHERE c.CUSTOMER_NAME LIKE 'PE%' AND c.UPD_DATE > :sql_last_value
ORDER BY c.UPD_DATE ASC"
schedule=>"*/1 * * * * "
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "upd_date"
jdbc_page_size => 10000
last_run_metadata_path =>"<path to logstash_metadata>"
record_last_run => true
}
}
For Oracle DB you'll have to change your query depending on the version, either using FETCH FIRST x ROWS ONLY; with Oracle 12, or ROWNUM for older versions.
In any case, I suggest you take a look at the logs to check the queries logstash runs.