I want to synchronize data between postgres and elasticsearch for this I am using logstash. This is how looks configure file for logstash
input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://postgres:5432/onlinestore"
jdbc_user => "postgres"
jdbc_password => "1234"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_driver_library => "/usr/share/logstash/libs/postgresql-42.7.3.jar"
statement => "
SELECT *
FROM products
WHERE updated_date > :sql_last_value
ORDER BY updated_date ASC
LIMIT 1000
"
use_column_value => true
tracking_column => "updated_date"
last_run_metadata_path => "/usr/share/logstash/.logstash_jdbc_last_run"
schedule => "* * * * *"
# clean_run => true
}
}
# Add extra field timestamp to indicate when we had performed an operation on the row
filter {
ruby {
code => "
event.set('sql_last_value_log', event.get('sql_last_value'))
event.set('tracking_column_log', event.get('updated_date'))
"
}
mutate {
add_field => { "timestamp" => "%{@timestamp}" }
}
}
output {
elasticsearch{
hosts => ["http://elasticsearch:9200"]
index => "products"
document_id => "%{id}"
}
}
When the query tries to execute I get this error log:
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/cronline.rb:77: warning: constant ::Fixnum is deprecated
[2024-07-07T10:06:04,030][ERROR][logstash.inputs.jdbc ][main][c96da82d0fe625ff4baf1477649ffdc670902119c45fc017f20bc336f857bcfe] Java::OrgPostgresqlUtil::PSQLException: ERROR: operator does not exist: timestamp without time zone > integer
Hint: No operator matches the given name and argument types. You might need to add explicit type casts.
Position: 82:
SELECT *
FROM products
WHERE updated_date > 0
ORDER BY updated_date ASC
LIMIT 1000
[2024-07-07T10:06:04,117][WARN ][logstash.inputs.jdbc ][main][c96da82d0fe625ff4baf1477649ffdc670902119c45fc017f20bc336f857bcfe] Exception when executing JDBC query {:exception=>Sequel::DatabaseError, :message=>"Java::OrgPostgresqlUtil::PSQLException: ERROR: operator does not exist: timestamp without time zone > integer\n Hint: No operator matches the given name and argument types. You might need to add explicit type casts.\n Position: 82", :cause=>"org.postgresql.util.PSQLException: ERROR: operator does not exist: timestamp without time zone > integer\n Hint: No operator matches the given name and argument types. You might need to add explicit type casts.\n Position: 82"}
Help me solve this problem
I have tried using conditionals and nullif statements in postgres and the query even executes but the value is always zero and not updated
statement => "
SELECT *
FROM products
WHERE updated_date > COALESCE(NULLIF(:sql_last_value::text, '0')::timestamp, '1970-01-01 00:00:00'::timestamp)
ORDER BY updated_date ASC
LIMIT 1000
"
You need to set value as timestamp
for tracking_column_type
as its default value is numeric
and due to that it is considering 0
always.
You can check this documentation.
jdbc {
jdbc_connection_string => "jdbc:postgresql://postgres:5432/onlinestore"
jdbc_user => "postgres"
jdbc_password => "1234"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_driver_library => "/usr/share/logstash/libs/postgresql-42.7.3.jar"
statement => "
SELECT *
FROM products
WHERE updated_date > :sql_last_value
ORDER BY updated_date ASC
LIMIT 1000
"
use_column_value => true
tracking_column => "updated_date"
last_run_metadata_path => "/usr/share/logstash/.logstash_jdbc_last_run"
schedule => "* * * * *"
tracking_column_type => "timestamp"
# clean_run => true
}