防止ES的的I/O的压力过大,使用redis/kafka进行缓冲。
对redis的要求
Redis input plugin | Logstash Reference [8.17] | Elastic
一般企业要求的架构
我实现的架构
filebeat把数据传给logstash
配置好filebeat把收集到的数据输入到redis
然后执行命令,filebeat就开始往redis中写数据
cd /etc/filebeat
/usr/share/filebeat/bin/filebeat -e -c filebeat.yml
logstash配置,从redis中读数据输出到elasticsearch
当我使用filebeat收集数据,传到redis,然后logstash从redis读取数据的时候有一个大问题就是
filebeat收集的是json格式的nginx访问日志,它被filebeat收集到redis的时候,会对这个收集的日志和与日志相关的元数据进行封装然后输出到redis。等logstash从redis里面取出来的时候,得到的数据是 收集的日志(message) 和 元数据字段的集合。
(就变成了一个字段message,其他都是没用的元数据字段。)
我收集的nginx日志格式
要解决的问题
1.json格式的日志数据message未正确解析
这是在filebeat直接向logstash传送数据的时候发送的错误,因为传送过去的数据没有设置正确的字符集,导致收集到的日志乱码
kibana显示
改正后基本上就没有乱码格式了,一个event是一条日志
2.删除无用的元数据字段
Filebeat直接连接logtsash的时候,filebeat通过output.logstash发送数据,默认不会添加完整元数据,输出到redis的时候,filebeat使用output.redis将日志包装成完整事件结构,包含所有的元数据。
Redis 作为缓冲队列:数据存储到 Redis 时,Filebeat 会保留完整的 Beat 事件结构(包括 @metadata
、host
、agent
等)。
所以当logstash从redis取日志数据的时候,会收集无用的元数据(只有message是收集的访问日志),我们要把无用的元数据过滤掉
然后会得到一整个大的message字段
3.过滤出日志里面的单个信息
所以删除无用的元数据之后如果我们不仅想得到访问日志,还想得到访问日志里面得具体数据,使用这个具体数据进行画图等等(比如我们想得到remote_addr字段,就需要对message进行解析)
input {redis {host => "10.8.0.23"port => 6379password => "Pu@1uC2016"db => 0data_type => "list"key => "nginx-accesslog"codec => json # 自动解析外层 JSON}
}
#nginx的原始的json格式的数据是这样的
#{"time_local":"2025-03-28T05:55:57+08:00","remote_addr":"45.90.163.37","remote_user":"","request":"PUT /v1/agent/service/register
#HTTP/1.1","status":"400","body_bytes_sent":"173","request_time":"0.263","http_referer":"","http_user_agent":"","http_x_forwarded_for":"",
#"upstream_addr":"","upstream_response_time":""}#但是message里面的数据是这样的,我们要把message中的数据转化为json格式,然后json过滤才能把字段(比如ip)过滤出来,然后可以对数据做图形分析
# "message" => "{\"time_local\":\"2025-03-28T18:12:23+08:00\",\"remote_addr\":\"10.8.0.23\",\"request\":\"HEAD / HTTP/1.1\",\"status\":\"200\",
#\"body_bytes_sent\":\"0\",\"request_time\":\"0.000\",\"http_referer\":\"\",\"http_user_agent\":\"curl/7.61.1\",}"#要把下面转化为上面的这样的json格式(上面是标准的json格式),filter里面的json工具才能把message这个大字段里面的小字段取出来
filter {# ========== 第一步:处理 message 字段中的内层 JSON ==========mutate { gsub => ["message", "\\\"", "\"" # 将 \" 替换为普通引号] }mutate {# 删除末尾多余逗号(如 ",}" → "}")gsub => ["message", ",}", "}","message", ",]", "]"]}# 解析 message 字段中的业务日志 这个json过滤,它只作用于json格式的数据json {source => "message"target => "nginx_log" # 解析结果存入 nginx_log 子对象remove_field => ["message"]tag_on_failure => ["_json_parse_failure"] # 标记解析失败的日志}# ========== 第二步:提升业务字段到根层级 ==========mutate {rename => {"[nginx_log][time_local]" => "time_local""[nginx_log][remote_addr]" => "remote_addr""[nginx_log][request]" => "request""[nginx_log][status]" => "status""[nginx_log][body_bytes_sent]" => "body_bytes_sent""[nginx_log][http_referer]" => "http_referer""[nginx_log][http_user_agent]" => "http_user_agent"}}#把一些无用的元数据过滤掉不要# ========== 第三步:清理所有元数据字段 ==========mutate {remove_field => ["host", "agent", "ecs", "log", "input", "fields","@version", "event", "nginx_log"]}# ========== 第四步:修正时间戳 ==========date {match => [ "time_local", "ISO8601" ]target => "@timestamp" # 覆盖默认时间戳}
}
output {elasticsearch {hosts => ["https://10.8.0.23:9200"]index => "dami-logs-%{+YYYY.MM.dd}"user => "elastic"password => "sxm@325468"ssl => truecacert => "/logstash/http_ca.crt" }# 调试时开启 stdout,查看完整字段stdout {codec => rubydebug {metadata => true # 显示元数据(确认字段结构)}}
}
最后可以得到message里面的单个小字段信息,以前只能在message中进行查看,不能对数据进行分析和画图,现在可以了
filter过滤器的其他用法
Filter plugins | Logstash Reference [8.17] | Elastic
1.grok
也可以康康这个人的文章
logstash过滤器插件filter详解及实例 - 峰哥ge - 博客园
官方文档
Grok filter plugin | Logstash Reference [8.17] | Elastic
grok %{语法:语义}
#语法在配置文件里已经定义好了
#语义是自己定义的,表示要将获得的字段放在哪个key里,例如下面ip就是key值,取出的字段值是value
match => {"message" => "%{IPV4:ip}"} #message是收集到的每一条数据
/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-patterns-core-4.3.4/patterns/ecs-v1/grok-patterns #语法在配置文件里已经定义好了
match => {"message" => "%{IPV4:ip}"} #message是收集到的每一条数据
match => {"message" => "%{@HTTPDATE:time}"} #
match => {"message" => "%{LOGLEVEL:level}"} #
2.groip 通过ip定位物理位置
Geoip 过滤器插件 |Logstash 参考 [8.17] |弹性的
logstash上要安装这个数据库(要先注册才能看)
Download GeoIP Databases | MaxMind
我的geoip配置写错了但是我懒得改了,我的阿里云服务器要释放了。
logstash收集到的数据还可以存放到数据库中,然后可以自己公司开发一个前端工具连接到数据库进行数据分析