I use logstash-input-jdbc to sync my database to elasticsearch.
Env: (logstash 7.5, elasticsearch 7.5,mysql-connector-java-5.1.48.jar, logstash-input-jdbc-4.3.16)
materials.conf:
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://localhost:3306/sc_education"
jdbc_driver_library => "connector/mysql-connector-java-5.1.48.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_user => "dauser"
jdbc_password => "daname"
jdbc_paging_enabled => "true"
jdbc_page_size => "50"
statement_filepath => "./materials.sql"
schedule => "* * * * *"
last_run_metadata_path => "./materials.info"
record_last_run => true
tracking_column => updated_at
codec => plain { charset => "UTF-8"}
# parameters => { "favorite_artist" => "Beethoven" }
# statement => "SELECT * from songs where artist = :favorite_artist"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "materials"
document_id => "%{material_id}"
}
stdout {
codec => json_lines
}
}
materials.sql:
SELECT material_name,material_id,
CASE grade_id
WHEN grade_id = 1 THEN "一年级"
WHEN grade_id = 2 THEN "二年级"
WHEN grade_id = 3 THEN "三年级"
WHEN grade_id = 4 THEN "四年级"
WHEN grade_id = 5 THEN "五年级"
WHEN grade_id = 6 THEN "六年级"
WHEN grade_id = 7 THEN "初一"
WHEN grade_id = 8 THEN "初二"
WHEN grade_id = 9 THEN "初三"
WHEN grade_id = 10 THEN "高一"
WHEN grade_id = 11 THEN "高二"
WHEN grade_id = 12 THEN "高三"
ELSE "" END as grade,
CASE subject_id
WHEN subject_id = 1 THEN "数学"
WHEN subject_id = 2 THEN "物理"
WHEN subject_id = 3 THEN "化学"
WHEN subject_id = 4 THEN "语文"
WHEN subject_id = 5 THEN "英语"
WHEN subject_id = 6 THEN "科学"
WHEN subject_id = 7 THEN "音乐"
WHEN subject_id = 8 THEN "绘画"
WHEN subject_id = 9 THEN "政治"
WHEN subject_id = 10 THEN "历史"
WHEN subject_id = 11 THEN "地理"
WHEN subject_id = 12 THEN "生物"
WHEN subject_id = 13 THEN "奥数"
ELSE "" END as subject,
CASE course_term_id
WHEN course_term_id = 1 THEN "春"
WHEN course_term_id = 2 THEN "暑"
WHEN course_term_id = 3 THEN "秋"
WHEN course_term_id = 4 THEN "寒"
ELSE "" END as season,
created_at, updated_at from sc_materials where updated_at > :sql_last_value and material_id in (2025,317,2050);
./bin/logstash -f materials.conf
{"@version":"1","updated_at":"2019-08-19T02:04:54.000Z","season":"?","grade":"","created_at":"2019-08-19T02:04:54.000Z","@timestamp":"2019-12-13T01:02:01.907Z","material_name":"test material seri''al","material_id":2025,"subject":"??"}
{"@version":"1","updated_at":"2019-08-26T09:25:35.000Z","season":"","grade":"","created_at":"2019-08-26T09:25:35.000Z","@timestamp":"2019-12-13T01:02:01.908Z","material_name":"人教版高中英语必修三第10讲Unit5 Canada The True North语法篇A学生版2.pdf","material_id":2050,"subject":""}
{"@version":"1","updated_at":"2019-08-10T06:50:48.000Z","season":"?","grade":"","created_at":"2019-05-27T06:26:44.000Z","@timestamp":"2019-12-13T01:02:01.880Z","material_name":"90aca2238832143fb75dcf0fe6dbbfa9.pdf","material_id":317,"subject":""}
The chinese chars in db works well, but the chinese chars in statement becomes chars ?
.
for me, characterEncoding=utf8
was not working.
after added this,
stdin {
codec => plain { charset => "UTF-8"}
}
works well.
here is my working conf file. It's a bit of a time to post an answer, but I hope it helps someone.
input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://localhost:5432/atlasdb?useTimezone=true&useLegacyDatetimeCode=false&serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=utf8"
jdbc_user => "atlas"
jdbc_password => "atlas"
jdbc_validate_connection => true
jdbc_driver_library => "/lib/postgres-42-test.jar"
jdbc_driver_class => "org.postgresql.Driver"
schedule => "* * * * *"
statement => "SELECT * from naver_city"
}
stdin {
codec => plain { charset => "UTF-8"}
}
}
output {
elasticsearch {
hosts => [ "localhost:9200" ]
index => "2020-04-23-2"
doc_as_upsert => true
action => "update"
document_id => "%{code}"
}
stdout { codec => rubydebug }
}