是否清除 last_run_metadata_path 的记录,如果为true那么每次都相当于从头开始查询所有的数据库记录
clean_run => false
SQL文件的位置或者SQL语句
statement_filepath => "/usr/local/logstash-7.9.2/bin/jdbcconfig/car.sql"
SQL:
SELECT * FROM car WHERE ID > :sql_last_value
设置列名小写,默认为true,Kibana区分大小写,ES区分大小写
lowercase_column_names => false
是否记录上次执行同步操作数据结果 true表示记录上次操作到的位置 就是将tracking_column 中的数据写入到last_run_metadata_path 文件中
record_last_run => true
是否记录某个column 的值,如果 record_last_run 为 true,可以自定义我们需要表的字段名称,
此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
use_column_value => true
如果 use_column_value 为 true ,需配置此参数. 这个参数就是数据库给出的一个字段名称。该字段必须是递增的,可以是主键ID或者时间列(作用为做为一个标识当下次执行同步操作的时候 根据这个列中的数据查起 )
tracking_column => ID
是否开启分页
jdbc_paging_enabled => true
分页大小
jdbc_page_size => 1000
标识存储文件地址
last_run_metadata_path => "/usr/local/logstash-7.9.2/bin/jdbcconfig/fgq_dbh_data_ID.txt"
input {
stdin {
}
jdbc {
jdbc_driver_library => "/usr/local/logstash-7.9.2/bin/jdbcconfig/mssql-jdbc-6.2.2.jre8.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://192.168.5.103:1433;DatabaseName=CRM;"
jdbc_user => "sa"
jdbc_password => "123456"
schedule => "* * * * *"
jdbc_paging_enabled => true
jdbc_page_size => 1000
clean_run => false
use_column_value => true
#设置查询条件的字段
tracking_column => ID
record_last_run => true
last_run_metadata_path => "/usr/local/logstash-7.9.2/bin/jdbcconfig/id1.txt"
#设置列名小写
lowercase_column_names => false
statement_filepath => "/usr/local/logstash-7.9.2/bin/jdbcconfig/sql1.sql"
#索引的类型
type => "list1"
}
jdbc {
jdbc_driver_library => "/usr/local/logstash-7.9.2/bin/jdbcconfig/sqljdbc.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://192.168.5.103:1433;DatabaseName=CRM;"
jdbc_user => "sa"
jdbc_password => "123456"
schedule => "* * * * *"
jdbc_paging_enabled => true
jdbc_page_size => 1000
clean_run => false
use_column_value => true
#设置查询条件的字段
tracking_column => ID
record_last_run => true
last_run_metadata_path => "/usr/local/logstash-7.9.2/bin/jdbcconfig/id2.txt"
#设置列名小写
lowercase_column_names => false
statement_filepath => "/usr/local/logstash-7.9.2/bin/jdbcconfig/sql2.sql"
#索引的类型
type => "list2"
}
}
#在数据库导入到es中的数据的时候 es会自动创建出一些字段就是下面的这几个字段 要是不想要这几个字段就添加过滤器
filter {
mutate {
remove_field => ["@timestamp","@version","create_time","update_time"]
}
}
output {
if[type]=="list1"{
elasticsearch {
hosts => ["192.168.5.101:9200"]
index => "advancelist"
document_id => "%{ID}"
}
}
if[type]=="list2"{
elasticsearch {
hosts => ["192.168.5.101:9200"]
index => "list2"
document_id => "%{ID}"
}
}
stdout {
codec => json_lines
#设置输出的格式
#codec => line {
# format => "ID: %{[ID]} 名称: %{[name]} 用户账号: %{[用户账号]} 时间: %{[时间]}"
#}
}
}