Problem statement : Logstash is not loading all records from Database to elasticsearch correctly and everytime I hit same api gets different results (However sometimes correct but changes on every hit and shows only subset of records under salutations nested field). The logstash mechanism looks sporadic and loading results are not consistent especially in One to Many scenario . http://localhost:9200/staffsalutation/_search I am observing a weird behaviour of logstash logstash-7.8.0 while loading records from 2 tables with query and configuration as below
Query :
select s.update_time, s.staff_id as staff_id, birth_date, first_name, last_name, gender, hire_date,
st.title AS title_nm, st.from_date AS title_frm_dt, st.to_date AS title_to_dt
from staff s
LEFT JOIN salutation st ON s.staff_id = st.staff_id
order by s.update_time
input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://localhost:5432/postgres"
jdbc_driver_library => "C:\\Users\\NS\\.m2\\repository\\org\\postgresql\\postgresql\\42.2.11\\postgresql-42.2.11.jar"
jdbc_user => "postgres"
jdbc_password => "postgres"
jdbc_driver_class => "org.postgresql.Driver"
schedule => "* * * * *"
statement => "select e.update_time, e.emp_no as staff_id, birth_date, first_name, last_name, gender, hire_date, t.title AS title_nm, t.from_date AS title_frm_dt, t.to_date AS title_to_dt
from employees e
LEFT JOIN titles t
ON e.emp_no = t.emp_no
order by e.update_time"
add_field => { "doctype" => "employee" }
tracking_column_type => "timestamp"
use_column_value =>true
tracking_column => update_time
jdbc_fetch_size => "50000"
}
}
filter {
aggregate {
task_id => "%{staff_id}"
code => "
map['staff_id'] = event.get('staff_id')
map['birth_date'] = event.get('birth_date')
map['first_name'] = event.get('first_name')
map['last_name'] = event.get('last_name')
map['gender'] = event.get('gender')
map['hire_date'] = event.get('hire_date')
map['salutations'] ||= []
map['salutations'] << {
'title_nm' => event.get('title_nm'),'title_frm_dt' => event.get('title_frm_dt'),
'title_to_dt' => event.get('title_to_dt')
}
event.cancel()
"
push_previous_map_as_event => true
timeout => 30
}
}
output {
elasticsearch {
document_id => "%{staff_id}"
index => "staffsalutation"
}
file {
path => "test.log"
codec => line
}
}
Found the solution !
select e.update_time, e.emp_no as staff_id, birth_date, first_name, last_name, gender, hire_date, t.title AS title_nm, t.from_date AS title_frm_dt, t.to_date AS title_to_dt from employees e LEFT JOIN titles t ON e.emp_no = t.emp_no order by e.emp_no
logstash -f logstash_config.conf -w 1