1.安装
2.配置
input {
stdin {}
jdbc {
jdbc_driver_library => "D:\logstash-6.4.2\bin\mysql\mysql-connector-java-5.1.48.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://数据库ip:port/表名?characterEncoding=UTF-8&autoReconnect=true"
jdbc_user => ""
jdbc_password => ""
connection_retry_attempts => "3"
jdbc_validate_connection => "true"
jdbc_validation_timeout => "3600"
jdbc_paging_enabled => "true"
jdbc_page_size => "1000"
statement => 'select * from fb_ads'
lowercase_column_names => false
sql_log_level => warn
record_last_run => true
use_column_value => true
tracking_column => "spider_date"
tracking_column_type => numeric
last_run_metadata_path => "mysql/last_id.txt"
clean_run => false
schedule => "* * * * *"
}
}
output {
elasticsearch {
index => "shopify"
hosts => "localhost:9200"
document_id => "%{id}"
template_name => "fb_ads"
template_overwrite => true
template => "/usr/local/logstash-6.4.2/template/fb_ads.json"
}
stdout {
codec => json_lines
}
}
(3)上面讲的配置文件适合单张表导入,如果我们要同时导入多张表怎么办,那么今天他来了
input {
stdin {}
jdbc {
type => "fb_ads"
jdbc_driver_library => "/usr/local/logstash-6.4.2/bin/mysql/mysql-connector-java-5.1.48.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://数据库ip:port/表名?characterEncoding=UTF-8&autoReconnect=true"
jdbc_user => ""
jdbc_password => ""
connection_retry_attempts => "3"
jdbc_validate_connection => "true"
jdbc_validation_timeout => "3600"
jdbc_paging_enabled => "true"
jdbc_page_size => "500"
jdbc_default_timezone =>"Asia/Shanghai"
statement => 'select * from fb_ads where update_time>= :sql_last_value'
lowercase_column_names => false
sql_log_level => warn
record_last_run => true
use_column_value => true
tracking_column => "update_time"
tracking_column_type => numeric
last_run_metadata_path => "/usr/local/logstash-6.4.2/bin/mysql/fb_ads_last_id.txt"
clean_run => false
schedule => "* * * * *"
}
jdbc {
type => "fb_homepage"
jdbc_driver_library => "/usr/local/logstash-6.4.2/bin/mysql/mysql-connector-java-5.1.48.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://数据库ip:port/表命?characterEncoding=UTF-8&autoReconnect=true"
jdbc_user => ""
jdbc_password => ""
connection_retry_attempts => "3"
jdbc_validate_connection => "true"
jdbc_validation_timeout => "3600"
jdbc_paging_enabled => "true"
jdbc_page_size => "500"
jdbc_default_timezone =>"Asia/Shanghai"
statement => 'select * from fb_homepage where update_stamp>= :sql_last_value'
lowercase_column_names => false
sql_log_level => warn
record_last_run => true
use_column_value => true
tracking_column => "update_stamp"
tracking_column_type => numeric
last_run_metadata_path => "/usr/local/logstash-6.4.2/bin/mysql/fb_homepage_last_id.txt"
clean_run => false
schedule => "* * * * *"
}
}
output {
if [type] == "fb_ads"{
elasticsearch {
index => "fb_ads"
hosts => "ip:9200"
document_id => "%{id}"
template_name => "fb_ads"
template_overwrite => true
template => "/usr/local/logstash-6.4.2/template/fb_ads.json"
}
}
if [type] == "fb_homepage"{
elasticsearch {
index => "fb_homepage"
hosts => "ip:9200"
document_id => "%{id}"
template_name => "fb_homepage"
template_overwrite => true
template => "/usr/local/logstash-6.4.2/template/fb_homepage.json"
}
}
stdout {
codec => json_lines
}
}
多张表导入的逻辑与单张表导入差不多,最后在输出的地方加上判断即可。
3.启动
4.性能
(1)如果第一次进行数据导入的时候可能因为数据量较大导致mysql性能下降,这是我们需要将数据按照区间划分进行多次导入。