原文地址:https://www.douyacun.com/article/8513551c1e6849ebd5b65a52cfc4a8c3
logstash最佳实践(只能入门看看):https://doc.yonyoucloud.com/doc/logstash-best-practice-cn/index.html
官方文档(看过logstash最佳实践以后,官方文档的大概结构就知道了):https://www.elastic.co/guide/en/logstash/current/index.html
filter grok 工具(logstash提供正则匹配工具):http://grokdebug.herokuapp.com/
logstash下载地址: https://www.elastic.co/cn/downloads/logstash
logstash.conf
input {
file {
path => "/Users/liuning/Documents/utils/datasets/movies.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
csv {
separator => ","
skip_header => true
columns => ["id","content","genres"]
}
mutate {
split => ["genres", "|"]
remove_field => ["path", "message", "@timestamp", "host"]
}
mutate {
split => ["content", "("]
add_field => { "title" => "%{[content][0]}"}
add_field => { "year" => "%{[content][1]}"}
}
mutate {
convert => {
"year" => "integer"
}
strip => ["title"]
remove_field => ["path", "host","@timestamp","message","content"]
}
}
output {
elasticsearch {
index => "movies"
document_id => "%{id}"
}
stdout { codec => rubydebug }
}
file 读取文件,还支持网络等。
-
, 老老实实放到一个简单的路径里,不知道下划线支不支持,自己试试从grouplens下载datasets是csv格式的,logstash默认值csv解析
,
默认是,
写不写都行"
表示引用格式,这里自带的csv插件是支持的。如果有解析错乱的话,不用考虑这个。mutate: 修改数据,添加字段,移除字段等等
output:输出到哪
elasticsearch
stdout:标准输出到控制台
sudo ./bin/logstash -f logstash.conf
这里用sudo,是因为要写入到/dev/null, 需要root权限。
具体 elasticsearch、kibana是如果运行的,看 elasticsearch docker的安装步骤
有需求需要把mysql中的数据同步到elasticsearch中, 按照每条记录的更新时间导入数据
看一下logstash已经有的plugins,我当前用的是7.6.0默认已经有了input jdbc了
bin/logstash-plugin list
老版本需要安装插件:
bin/logstash-plugin install logstash-input-jdbc
下在java mysql连接驱动,jdbc_driver_library: 从这里下载最新的版本
input {
jdbc {
jdbc_validate_connection => true
jdbc_driver_library => "/opt/driver/mysql-connector-java-5.1.36.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/videos_t"
jdbc_user => "root"
jdbc_password => "123456"
# 定时器
schedule => "*/1 * * * * "
# 记录 sql_last_value 值的类型,支持: number / timestamp
tracking_column_type => "timestamp"
# 记录上一次查询是最后的值
use_column_value => true
tracking_column => "updated_at"
# 时区 local 或 utc
plugin_timezone="local"
# sql_last_value logstash提供的预定义宏,上一次最后查询列的值,具体那一列由 tracking_column 指定
statement => "select * from foo where updated_at > :sql_last_value"
# 分页,防止首次导入的时候数据过多
jdbc_paging_enabled => "true"
jdbc_page_size => "10000"
}
}
filter {
}
output {
stdout { codec => json_lines }
}
这里的需要是按照更新时间导入数据,需要用到3个配置:
use_column_value
: 使用上一次查询的值,作为 sql_last_valuetracking_column
: 使用哪一列tracking_column_type
: 该列值的类型,numeric
, timestamp
如果是只有插入同步到elasticsearch的话,可以按id同步
tracking_column => "id"
tracking_column_type => "numeric"
statement
: sql语句,支持参数绑定的形式
prepared_statement_bind_values
: 数组,sql中绑定的参数
schedule
: 定时器,具体看cron的用法
plugin_timezone
: 按更新时间同步数据的话,一定要注意时区,默认是:utc,支持local和utc。否则每次查询条件都是差8个小时
jdbc_paging_enabled
: 分页,第一次同步几百万的话,一次查询数据量可能太多
jdbc_page_size
: 每页条数
logstash_jdbc_last_run
: 上一次同步的值, 默认在 ~/.logstash_jdbc_last_run
, 这个最后单独配置一下,一张表同步的话还可以,如果是多张的话就会用成同一个文件中的值
这个错误用logstash(7.5.0)报的错,github issue作者提到新版已经修复这个问题所以上面用的版本是7.6.0
com.mysql.jdbc.Driver not loaded. Are you sure you've included the correct jdbc driver in :jdbc_driver_library?