I am using logstash
to add indexes to my elastic seach
by pulling data from my Oracle DB
and my logstash config is as follows:
input {
jdbc
{
jdbc_driver_library => "<path>/ojdbc10.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_connection_string => "connection string"
jdbc_user => <username>
jdbc_password => <pwd>
statement => "SELECT * FROM my table name"
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "test1"
}
And I see the below result:
curl http://<ip>:9200/_cat/indices?v
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
yellow open test1 596HT2HTRyyNl79GdiXvvw 1 1 5186122 0 799.4mb 799.4mb
This is working fine and I am able to load data. But now when I try to add an Id to my index so that I can prevent duplicate enties, with following config:
input {
jdbc
{
jdbc_driver_library => "<path>/ojdbc10.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_connection_string => "connection string"
jdbc_user => <username>
jdbc_password => <pwd>
statement => "SELECT * FROM my table name"
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "test4"
action => "update"
doc_as_upsert => true
document_id => "%{col1}%-%{col2}%"
}
And now when I query indexes:
curl http://<ip>:9200/_cat/indices?v
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
ellow open test4 PhkmuzRPTPCplaxQGNaelw 1 1 1 149711 31.2mb 31.2mb
Not sure what I am missing here. I want to aviod duplicates and also want to be able to capture delta on that table. My table has a composite key
which I am trying to combine as id for the elastic search index here.
I was able to follow the link shared be @Val https://stackoverflow.com/a/33591537/4604579 updated my configs and I am able to index the document as required
My Updated config is:
input {
jdbc
{
jdbc_driver_library => "<path>/ojdbc10.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_connection_string => "connection string"
jdbc_user => <username>
jdbc_password => <pwd>
statement => "SELECT * FROM my table name WHERE last_modified_date >= :sql_last_value"
schedule => "* * * * *"
"
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "test2"
document_id => "%{col1}-%{col2}-%{col3}"
action => "update"
doc_as_upsert => true
document_id => "%{col1}-%{col2}-%{col3}"
}