elasticsearchlogstashelastic-stackelklogstash-jdbc

Logstash :sql_last_value is showing wrong junk date (Showing 6 month's old date as last run time)


I am observing very strange issue I am using logstash + jdbc to load data from Oracle db to Elasticsearch Below is how my config file looks like

input{
  jdbc{
    clean_run => "false"
    jdbc_driver_library => "<path_to_ojdbc8-12.1.0.jar>"
    jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
    jdbc_connection_string => "<connection_string>"
    jdbc_user => "<usename>"
    jdbc_password_filepath => ".\pwd.txt"
    statement=> "SELECT * FROM customers WHERE CUSTOMER_NAME  LIKE 'PE%' AND UPD_DATE  > :sql_last_value "
    schedule=>"*/1 * * * * "
    use_column_value => true
    tracking_column_type => "timestamp"
    
    tracking_column => "upd_date"
    
    last_run_metadata_path =>"<path to logstash_metadata>"
    record_last_run => true
    }
}

filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["@version","@timestamp"]
  }
}
output {
      elasticsearch{
    hosts => ["<host>"]
    index => "<index_name>"
    document_id=>"%{[@metadata][_id]}"
    user => "<user>"
    password => "<pwd>"
}

   stdout{
      codec => dots
   }
}

Now , i am triggering this file every minute on today that is March 8th 2021. when i load for first-time , its all good -:sql_last_value is '1970-01-01 00:00:00.000000 +00:00'

But after this first load , ideally logstash_metadata should be showing '2021-03-08 <HH:MM:ss>' But strangely it is getting update as --- 2020-09-11 01:05:09.000000000 Z in logstash_metadata (:sql_last_value)

As you can see the difference is near about 180 days

I tried multiple times but still it is updating in the same way , Due to this my incremental load is getting screwed

My logstash Version is 7.10.2

Help is much appreciated!

NOTE: I am not using pagination as the number of results in the resultset are always very low in number for my query


Solution

  • The recorded date is the date of the last processed row.

    Seeing your query , you don't have a specific order for the records read from DB. Logstash jdbc input plugin encloses your query to one that orders rows by [1], 1 being the ordinal of the column it orders by.

    So to process records in a correct order and get the latest upd_date value you need have upd_date be the first column in the select statement.

    input{
      jdbc{
        clean_run => "false"
        jdbc_driver_library => "<path_to_ojdbc8-12.1.0.jar>"
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
        jdbc_connection_string => "<connection_string>"
        jdbc_user => "<usename>"
        jdbc_password_filepath => ".\pwd.txt"
        statement=> "SELECT c.UPD_DATE, c.CUSTOMER_NAME, c.<Other field> 
                     FROM customers c 
                     WHERE c.CUSTOMER_NAME LIKE 'PE%' AND c.UPD_DATE > :sql_last_value 
                     ORDER BY c.UPD_DATE ASC"
        schedule=>"*/1 * * * * "
        use_column_value => true
        tracking_column_type => "timestamp"
        tracking_column => "upd_date"        
        last_run_metadata_path =>"<path to logstash_metadata>"
        record_last_run => true
        }
    }
    

    Also note that this approach will exhaust the table the first time logstash runs, even if you set up jdbc_page_size. If you want this, that's fine.

    But if you want logstash to run one batch of X rows every minute and stop until the next execution, then you must use a combination of jdbc_page_size and query with limits to make logstash retrieve exactly the amount of records you want, in the correct order. In SQL Server it work like that:

    input{
      jdbc{
        jdbc_driver_library => ...
        jdbc_driver_class => ...
        jdbc_connection_string => ...
        jdbc_user => ...
        jdbc_password_filepath => ...
        statement=> "SELECT TOP 10000 c.UPD_DATE, c.CUSTOMER_NAME
                     FROM customers c 
                     WHERE c.CUSTOMER_NAME  LIKE 'PE%' AND c.UPD_DATE  > :sql_last_value 
                     ORDER BY c.UPD_DATE ASC"
        schedule=>"*/1 * * * * "
        use_column_value => true
        tracking_column_type => "timestamp"
        tracking_column => "upd_date"
        jdbc_page_size => 10000
        last_run_metadata_path =>"<path to logstash_metadata>"
        record_last_run => true
        }
    }
    

    For Oracle DB you'll have to change your query depending on the version, either using FETCH FIRST x ROWS ONLY; with Oracle 12, or ROWNUM for older versions.

    In any case, I suggest you take a look at the logs to check the queries logstash runs.