ELK安装、部署、调试 (八)logstash配置语法详解

input {#输入插件
}filter {#过滤插件
}output {#输出插件
}

1.读取文件。
 使用filewatch的ruby gem库来监听文件变化,并通过.sincedb的数据库文件记录被监听日志we年的读取进度(时间
搓)
。sincedb数据文件的默认路径为<path.data>/plugins/inputs/file下面,文件名类似
于.sincedb_234534534sdfgsfd23,<path.data>为logstash的插件存储目录默认是LOGSTASH_HOME/data实验一:本机/var/log/secure为输入日志,标准输出

vi /usr/local/logstash/2logstash-1.confinput {file {path => ["/var/log/messages"],[]type => "ly_system"start_position => "beginning"       
#从beginning也就是文件开头进行读取,如果不写,默认是从文件最后开始读取。                                #如果不想把文件全部作为输入,就不配置此属性。}
}output {stdout {codec => rubydebug}
}


1.保存10.10.10.74 f-kafka-logs-es.conf的配置信息

[root@localhost logstash]# cat f-kafka-logs-es.conf
input {kafka {bootstrap_servers => "10.10.10.71:9092,10.10.10.72:9092,10.10.10.73:9092"topics => ["osmessages"]}
}
output {elasticsearch {hosts => ["10.10.10.65:9200","10.10.10.66:9200","10.10.10.67:9200"]index => "osmessageslog-%{+YYYY-MM-dd}"}
}

2.停止logstash服务
 kill -9 13508 
3.使用2logstash-1.conf作为配置文件启动logstash
nohup /usr/local/logstash/bin/logstash -f /usr/local/logstash/2logstash-1.conf &4.查看

tail -f nohup.out{"type" => "ly_system","path" => "/var/log/secure","@version" => "1","host" => "localhost.localdomain","message" => "Aug 31 08:43:56 localhost sshd[6920]: Accepted password for root from
172.16.17.234 port 1909 ssh2","@timestamp" => 2023-08-31T00:52:13.054Z
}

实验二:input插件添加域和标签

[root@localhost logstash]# cat 2logstash-1.conf
input {file {path => ["/var/log/secure"]type => "ly_system"start_position => "beginning"add_field => {"I'm " => "10.10.10.74"}tags => ["74","logstash1"]


#从beginning也就是文件开头进行读取,如果不写,默认是从文件最后开始读取。
#如果不想把文件全部作为输入,就不配置此属性。

  }
}
output {stdout {codec => rubydebug}
}

***************************
add_field => {"I'm " => "10.10.10.74"}  添加一个新的域,自己定义的。
tags => ["74","logstash1"]  tags是内置的域,可以用来定义标签。
***************************
输出结果

{"@version" => "1","host" => "localhost.localdomain","I'm " => "10.10.10.74","path" => "/var/log/secure","message" => "Aug 31 09:03:33 localhost sshd[14339]: Accepted password for root from
172.16.17.234 port 2684 ssh2","@timestamp" => 2023-08-31T01:03:53.586Z,"type" => "ly_system","tags" => [[0] "74",[1] "logstash1"]
}

实验三:input读取syslog日志。
需要完成2个步骤的操作,
1,vi /etc/rsyslog.conf
*.* @@10.10.10.74:5514       #10.10.10.74本机logstash服务器的IP地址,这个配置时使用rsyslog客户端把本机
的日志信息传输到10.10.10.74服务器的5514端口上去。
2.重启rsyslog
systemctl restart rsyslog

logstash配置文件如下:需要先启动,启动后会开启5514端口,用来侦听。

[root@localhost logstash]# cat rsyslog-logstash.conf
input {syslog {port => "5514"}
}
output {stdout {codec => rubydebug}
}
[root@localhost logstash]#

查看日志

tail -f nohup.out[2023-08-31T09:51:20,356][INFO ][logstash.inputs.syslog   ][main]
[1ac4f1a43da057380f8444a587ee7cb01fe84a0702afb9d46abc9667eeb0ea0c] Starting syslog tcp listener
{:address=>"0.0.0.0:5514"}
[2023-08-31T09:51:20,390][INFO ][logstash.inputs.syslog   ][main]
[1ac4f1a43da057380f8444a587ee7cb01fe84a0702afb9d46abc9667eeb0ea0c] Starting syslog udp listener
{:address=>"0.0.0.0:5514"}

日志源服务器 10.10.10.56 启动rsyslog客户端
1,vi /etc/rsyslog.conf
*.* @@10.10.10.74:5514       #10.10.10.74本机logstash服务器的IP地址,这个配置时使用rsyslog客户端把本机
的日志信息传输到10.10.10.74服务器的5514端口上去。
2.重启rsyslog

systemctl restart rsyslog[root@node1 ~]# systemctl restart rsyslog[root@node1 ~]# service status rsyslog
The service command supports only basic LSB actions (start, stop, restart, try-restart, reload, force-
reload, status). For other actions, please try to use systemctl.
[root@node1 ~]# systemctl status rsyslog
● rsyslog.service - System Logging ServiceLoaded: loaded (/usr/lib/systemd/system/rsyslog.service; enabled; vendor preset: enabled)Active: active (running) since 四 2023-08-31 10:00:47 CST; 31s ago

logstash信息输出

{"facility_label" => "system","@version" => "1","timestamp" => "Aug 31 10:02:01","facility" => 3,"host" => "10.10.10.56","logsource" => "node1","priority" => 30,"@timestamp" => 2023-08-31T02:02:01.000Z,"severity" => 6,"severity_label" => "Informational","message" => "Removed slice User Slice of liuyang.\n","program" => "systemd"
}
{"facility_label" => "security/authorization","@version" => "1","timestamp" => "Aug 31 10:02:01","facility" => 10,"host" => "10.10.10.56","logsource" => "node1","priority" => 87,"@timestamp" => 2023-08-31T02:02:01.000Z,"severity" => 7,"severity_label" => "Debug","pid" => "17805","message" => "pam_limits(crond:session): unknown limit item 'noproc'\n","program" => "crond"

通过上面的日志输入,发现logstash把接收到的日志进行了详细的划分。会把日志中的时间,主机名,程序,具体信
息拆分成多个字段进行存储。
"timestamp" 为源日志的时间
"@timestamp" 为logstash抓取日志的时间,与上面的时间差了8个小时,这个是时区的配置问题。

**********************************************实验四:读取tcp网络数据
下面的时间配置文件是通过“LogStash::Inputs::TCP”和"LogStash::Filters::Grok"相配合实现实验三rsyslog功能
的日志读取

[root@localhost logstash]# cat tcp-logstash.conf
input {tcp {port => "5514"}
}filter {grok {match => {"message" => "%{SYSLOGLINE}"}}
}output {stdout {codec => rubydebug}
}


[root@l
启动logstash服务
 nohup /usr/local/logstash/bin/logstash -f /usr/local/logstash/tcp-logstash.conf &
查看日志
[2023-08-31T10:08:39,596][INFO ][logstash.inputs.tcp      ][main]
[e17c63be3a5b12883f975a9f5eaf27f19639714f6267583b2142379ed6c8f22a] Starting tcp input listener
{:address=>"0.0.0.0:5514", :ss                          l_enable=>"false"}

5514端口已启动

客户端同样适用rsyslog,同上一样的配置

logstash 日志查询

{"port" => 58526,"message" => [[0] "<30>Aug 31 10:17:01 node1 systemd: Started Session 351785 of user liuyang.",[1] "Started Session 351785 of user liuyang."],"program" => "systemd","logsource" => "node1","host" => "10.10.10.56","timestamp" => "Aug 31 10:17:01","@version" => "1","@timestamp" => 2023-08-31T02:10:24.759Z
}
{"port" => 58526,"message" => [[0] "<85>Aug 31 10:17:04 node1 polkitd[1172]: Registered Authentication Agent for unix-
process:20549:1866610077 (system bus name :1.703886 [/usr/bin/pkttyagent --notify-fd 5 --fallback],
object path /org/freedesktop/PolicyKit1/AuthenticationAgent, locale zh_CN.UTF-8)",[1] "Registered Authentication Agent for unix-process:20549:1866610077 (system bus name
:1.703886 [/usr/bin/pkttyagent --notify-fd 5 --fallback], object path
/org/freedesktop/PolicyKit1/AuthenticationAgent, locale zh_CN.UTF-8)"],"program" => "polkitd","logsource" => "node1","pid" => "1172","host" => "10.10.10.56","timestamp" => "Aug 31 10:17:04","@version" => "1","@timestamp" => 2023-08-31T02:10:26.949Z
}


我们可以看出【0】是完整的信息输出
【1】是经过拆分的,如pid 、logsource、 时间、port都进行拆分出来了 
tcp方式和rsyslog类似。


******************************************************
实验五 适用nc 的方式将日志导入到logstash

客户端(日志源)10.10.10.56 
服务器(logstash)10.10.10.74服务器logstash配置方法如实验四

[root@localhost logstash]# cat tcp-logstash.conf
input {tcp {port => "5514"}
}filter {grok {match => {"message" => "%{SYSLOGLINE}"}}
}output {stdout {codec => rubydebug}
}

客户端命令行窗口输入:
 nc 10.10.10.74 5514 </var/log/messages

在logstash上查看日志

{"port" => 59322,"message" => [[0] "Aug 27 12:34:57 node1 supervisord: 2023-08-27 12:34:57,594 INFO supervisord started with
pid 4760",[1] "2023-08-27 12:34:57,594 INFO supervisord started with pid 4760"],"program" => "supervisord","logsource" => "node1","host" => "10.10.10.56","timestamp" => "Aug 27 12:34:57","@version" => "1","@timestamp" => 2023-08-31T02:16:27.986Z
}
{"port" => 59322,"message" => [[0] "Aug 27 12:34:58 node1 supervisord",[1] "supervisord"],"logsource" => "node1","host" => "10.10.10.56","timestamp" => "Aug 27 12:34:58","@version" => "1","@timestamp" => 2023-08-31T02:16:27.992Z
}


完成实验。


实验六  编码插件codec
此插件可以放到输入和输出时来处理数据
input -> decode -->  filter --> decode --->output  decode就是使用codec进行编码
codec支持plain 、json、json_lines等格式。
1.codec插件之plain
 plain是一个空解释器,输入什么格式,输出就是什么格式

[root@localhost logstash]# vi codec1-logstash.log
input {stdin { }
}output {stdout {codec => "plain"
#前面的测试我们都使用rubydebug编码,此编码会以json的格式进行输出}
}


[root@localhost logstash]# vi codec1-logstash.log
[root@localhost logstash]# /usr/local/logstash/bin/logstash -f /usr/local/logstash/codec1-logstash.log 
此处不能用nohup,否则抓取不到


hello             #我用键盘输入的,下面的信息是logstash的输出。
2023-08-31T02:28:31.068Z localhost.localdomain hello
nihao
2023-08-31T02:28:37.572Z localhost.localdomain nihao
仅增加了2个字段一个是时间戳,一个是主机名

和以前使用rubydebug的日志来对比一下

hello             #我用键盘输入的,下面的信息是logstash的输出。

{"message" => "hello","@version" => "1","host" => "localhost.localdomain","@timestamp" => 2023-08-29T02:22:53.965Z
}

2.codec插件之json
发送给logstash的数据如果是json格式的,那必须在input字段加入codec=> json来解析进来的数据,
如果想让logstash输出为json的格式,可以在output字段加入codec=>json,

[root@localhost logstash]# vi codec2-logstash.log
input {stdin { }
}output {stdout {codec => "json"   #以json的格式 输出}
}


json模式就是key:values格式


3.codec插件之json_lines
若果json文件比较长,需要换行的话,就会使用json_lines编码格式。


实验七  过滤器插件filter
1.grok正则捕获
grok是一个强大的filter插件,通过正则解析任意文本文件,将非结构化的数据弄成结构化的数据,方便查询。

https://help.aliyun.com/zh/sls/user-guide/grok-patterns
GROK的模式参考及示例

grok的语法规则
%{语法:语义}
语法指的就是匹配模式,例如使用number模式可以匹配数字,ip模式会匹配出127.0.0.1样式的IP地址

例如1.输入内容为:172.16.213.132 [07/Feb/2018:16:24:19 +0800]"GET / HTTP/ 1.1" 403 5039
那么,%{IP:clientip} IP就是语法,要匹配IP地址,  clientip为内容
匹配的结果为clientip:172.16.213.132

例如2:
%{HTTPDATE:timestamp}结果为07/feb/2018:16:24 +800


例如3:
%{QS:referrer}匹配的结果
GET / HTTP/ 1.1

以上IP\ Httpdate 、QS都是grok内部定义好的模式,
/usr/local/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns
[root@localhost patterns]# ls
aws     bind  exim       grok-patterns  httpd  junos         maven        mcollective-patterns  nagios  
    rails  ruby
bacula  bro   firewalls  haproxy        java   linux-syslog  mcollective  mongodb              
postgresql  redis  squid
[
这个目录下,有很多匹配模式,我们可以直接拿来应用,其中grok-patterns使我们使用的基础匹配模式

vi grok-patterns
显示一小段内容如下:

IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-
5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0
-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d))
{3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|
1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]
{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1
-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-
5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:
[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-
4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-
4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?
IPV4 (?<![0-9])(?:(?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])
[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5]))(?![0-9])
IP (?:%{IPV6}|%{IPV4})

grok在线调试工具,
网址:grokdebug.herokuapp.com 可能需要翻墙
https://www.5axxw.com/tools/v2/grok.html
以上两个都不好使
自己在docker上搭建一个
10.10.10.56上安装了docker
docker pull epurs/grokdebugger:latest
docker images
docker run -d --name grokdebugger -p 8082:80 epurs/grokdebugger
http://10.10.10.56:8082

input输入日志
pattern为模式

[root@localhost logstash]# vi grok1-logstash.log
input {stdin { }
}filter {grok {match => ["message","%{IP:clientip}"]}
}output {stdout {codec => "rubydebug"   }
}


[root@localhost logstash]# /usr/local/logstash/bin/logstash -f /usr/local/logstash/grok1-logstash.log

172.16.213.132 [07/Feb/2018:16:24:19 +0800]"GET / HTTP/ 1.1" 403 5039        #输入
{输出为:"clientip" => "172.16.213.132","@timestamp" => 2023-08-31T07:36:18.564Z,"@version" => "1","message" => "172.16.213.132 [07/Feb/2018:16:24:19 +0800]\"GET / HTTP/ 1.1\" 403 5039","host" => "localhost.localdomain"


测试2:
[root@localhost logstash]# vi grok2-logstash.log

input {stdin { }
}filter {grok {match => ["message","%{IP:client-ip} \[%{HTTPDATE:timestamp1}\]%{QS:refer}\ %{NUMBER:nu}
%{NUMBER:bytes1}"]}
}output {stdout {codec => "rubydebug"   }
}


输出为:

172.16.213.132 [07/Feb/2018:16:24:19 +0800]"GET / HTTP/ 1.1" 403 5039
{"host" => "localhost.localdomain","timestamp1" => "07/Feb/2018:16:24:19 +0800","refer" => "\"GET / HTTP/ 1.1\"","nu" => "403","@version" => "1","message" => "172.16.213.132 [07/Feb/2018:16:24:19 +0800]\"GET / HTTP/ 1.1\" 403 5039","@timestamp" => 2023-08-31T07:46:51.778Z,"bytes1" => "5039","client-ip" => "172.16.213.132"


我们已将看到message已经分成5部分了,原有的message可以去掉了,系统中存在连个timestamp,其实@timestamp也
不需要了,这个时间是收集日志的时间。而kibana使用@timestamp这个字段来排序。我们可以将timestamp的值付给
@timestamp

[root@localhost logstash]# vi grok-delete-logstash.log
input {stdin {}
}filter {grok {match => ["message","%{IP:client-ip} \[%{HTTPDATE:timestamp1}\]%{QS:refer}\ %{NUMBER:nu}
%{NUMBER:bytes1}"]remove_field => ["message"]}date {match => ["timestamp1","dd/mmm/yyyy:HH:mm:ss Z"]}mutate {remove_field => ["timestamp1"]}}output {stdout {codec => "rubydebug"}
}

输出结果

172.16.213.132 [07/Feb/2018:16:24:19 +0800]"GET / HTTP/ 1.1" 403 5039
{"@timestamp" => 2023-08-31T08:16:31.119Z,"refer" => "\"GET / HTTP/ 1.1\"","nu" => "403","host" => "localhost.localdomain","client-ip" => "172.16.213.132","tags" => [[0] "_dateparsefailure"],"bytes1" => "5039","@version" => "1"

以上使用了grok、date、mutate插件

时间处理模式 DATE
date插件 就是将值以什么格式赋值给@timestamp
date {
    match => ["timestamp1","dd/mmm/yyyy:HH:mm:ss Z"]
   }

将timestamp1按照后边dd/mmm/yyyy:HH:mm:ss Z的格式赋值给@timestamp

数据修改插件  mutate
1.正则表达式替换匹配字段
 gsub可以通过正则表达式替换字段中匹配到的值,只对字符串段有效,例子
filter {
  mutate {
    gsub => ["filed_name_1","/","_"]
#表示将field_name_1属性的字段中所有"/"字符替换成"_"
  }
}
实例:[root@localhost logstash]# cat grok-mutate-logstash.log                                       input {

  stdin {}
}filter {grok {match => ["message","%{IP:client-ip} \[%{HTTPDATE:timestamp1}\]%{QS:refer}\ %{NUMBER:nu}
%{NUMBER:bytes1}"]}date {match => ["timestamp1","dd/mmm/yyyy:HH:mm:ss Z"]}mutate {gsub => ["message","/","_"]}}output {stdout {codec => "rubydebug"}
}

/usr/local/logstash/bin/logstash -f /usr/local/logstash/grok-mutate-logstash.log

172.16.213.132 [07/Feb/2018:16:24:19 +0800]"GET / HTTP/ 1.1" 403 5039
{"refer" => "\"GET / HTTP/ 1.1\"","tags" => [[0] "_dateparsefailure"],"client-ip" => "172.16.213.132","bytes1" => "5039","message" => "172.16.213.132 [07_Feb_2018:16:24:19 +0800]\"GET _ HTTP_ 1.1\" 403 5039","@timestamp" => 2023-08-31T08:35:00.815Z,"@version" => "1","host" => "localhost.localdomain","nu" => "403","timestamp1" => "07/Feb/2018:16:24:19 +0800"
}


看到"/"都替换成了"_"

2.分隔字符串为数组
split用分隔符分隔字符串为数组

filter {mutate {split => ["filed_name_2","|"]
#表示将field_name_1属性的字段中所有"/"字符替换成"_"}
}

172.16.213.132|[07/Feb/2018:16:24:19 +0800]|"GET / HTTP/ 1.1"|403|5039
实例:

[root@localhost logstash]# cat grok-mutate2-logstash.log
input {stdin {}
}filter {grok {match => ["message","%{IP:client-ip} \[%{HTTPDATE:timestamp1}\]%{QS:refer}\ %{NUMBER:nu}
%{NUMBER:bytes1}"]}date {match => ["timestamp1","dd/mmm/yyyy:HH:mm:ss Z"]}mutate {split => ["message","|"]}}output {stdout {codec => "rubydebug"}
}


结果:

/usr/local/logstash/bin/logstash -f /usr/local/logstash/grok-mutate2-logstash.log172.16.213.132|[07/Feb/2018:16:24:19 +0800]|"GET / HTTP/ 1.1"|403|5039
{"message" => [[0] "172.16.213.132",[1] "[07/Feb/2018:16:24:19 +0800]",[2] "\"GET / HTTP/ 1.1\"",[3] "403",[4] "5039"],"@timestamp" => 2023-08-31T08:39:17.562Z,"host" => "localhost.localdomain","@version" => "1","tags" => [[0] "_grokparsefailure"]
}


我们发现message的信息分成了5部分,以后调用以数组的形式调用

3.重命名字段rename
mutate {
    rename => {"message","message_new"}
  }
}
实例略

4.删除字段remove_field

mutate {
    remove_field => ["message"]
  }
}

综合实例:

  mutate {
    rename => {"nu","number"}
    gsub => ["refer","/","_"]
    remove_field => ["timestamp1"]
    split => ["client-ip","."]
    }
重命名  替换  删除 分隔都可以写在一起。

Geoip地址查询归类
geoIP是免费的ip地址归类查询库,可以通过IP地址提供对应的地域信息,包括国别,省市,经纬度等,此插件对可视
化地图和区域统计非常有用。
filter {
  geoip {
    source => "ip_field"
# ip_field字段是输出ip地址的一个字段

  }
}
实例:
logstash配置[root@localhost logstash]# cat grok-geoip.log

input {stdin {}
}filter {grok {match => ["message","%{IP:client-ip} \[%{HTTPDATE:timestamp1}\]%{QS:refer}\ %{NUMBER:nu}
%{NUMBER:bytes1}"]}date {match => ["timestamp1","dd/mmm/yyyy:HH:mm:ss Z"]}geoip {source => "client-ip"}
}output {stdout {codec => "rubydebug"}
}

172.16.213.132 [07/Feb/2018:16:24:19 +0800]"GET / HTTP/ 1.1" 403 5039
114.114.114.114 [07/Feb/2018:16:24:19 +0800]"GET / HTTP/ 1.1" 403 5039
输出结果:

202.97.224.68 [07/Feb/2018:16:24:19 +0800]"GET / HTTP/ 1.1" 403 5039
{"timestamp1" => "07/Feb/2018:16:24:19 +0800","bytes1" => "5039","geoip" => {"latitude" => 45.75,"region_name" => "Heilongjiang","country_code2" => "CN","country_name" => "China","longitude" => 126.65,"location" => {"lon" => 126.65,"lat" => 45.75},"country_code3" => "CN","region_code" => "HL","continent_code" => "AS","ip" => "202.97.224.68","timezone" => "Asia/Shanghai"},"message" => "202.97.224.68 [07/Feb/2018:16:24:19 +0800]\"GET / HTTP/ 1.1\" 403 5039","@timestamp" => 2023-09-01T01:27:01.443Z,"client-ip" => "202.97.224.68","refer" => "\"GET / HTTP/ 1.1\"","nu" => "403","@version" => "1","host" => "localhost.localdomain","tags" => [[0] "_dateparsefailure"]
}

longitude  latitude 经纬度 

以上信息有些多,想精简一些

精简geoip信息
geoip {
  source => "client-ip"
  fields => ["ip","country_code3","longitude","latitude","region_name"]
#仅将需要保留的域显示出来
  }[root@localhost logstash]# cat grok-geoip2.log

input {stdin {}
}filter {grok {match => ["message","%{IP:client-ip} \[%{HTTPDATE:timestamp1}\]%{QS:refer}\ %{NUMBER:nu}
%{NUMBER:bytes1}"]}date {match => ["timestamp1","dd/mmm/yyyy:HH:mm:ss Z"]}geoip {source => "client-ip"fields => ["ip","country_code3","longitude","latitude","region_name"]}
}output {stdout {codec => "rubydebug"}
}

输出结果:

202.97.224.68 [07/Feb/2018:16:24:19 +0800]"GET / HTTP/ 1.1" 403 5039

{"nu" => "403","@timestamp" => 2023-09-01T01:30:39.227Z,"@version" => "1","tags" => [[0] "_dateparsefailure"],"geoip" => {"ip" => "202.97.224.68","country_code3" => "CN","latitude" => 45.75,"region_name" => "Heilongjiang","longitude" => 126.65},"message" => "202.97.224.68 [07/Feb/2018:16:24:19 +0800]\"GET / HTTP/ 1.1\" 403 5039","client-ip" => "202.97.224.68","host" => "localhost.localdomain","refer" => "\"GET / HTTP/ 1.1\"","timestamp1" => "07/Feb/2018:16:24:19 +0800","bytes1" => "5039"
}

logstash的输出插件output

file  将数据写入磁盘文件
elasticsearch :把日志数据发送到es集群
graphite:用于存储和绘制数据指标
还支持输出到redis,email,exec,ngios等等

1.标准输出

output {
  stdout {
    codec => "rubydebug"
  }
}


2.保存到文件

output {
  file {
    path => "/data/log3/%{+yyyy-MM-dd}/%{host}_%{+HH}.log"
  }
}

例子:
[root@localhost logstash]# cat file-log.log

input {stdin {}
}
output {file {path => "/data/log3/%{+yyyy-MM-dd}/%{host}_%{+HH}.log"}
}


/usr/local/logstash/bin/logstash -f /usr/local/logstash/file-log.log
标准输入信息后
114.114.114.114 [07/Feb/2018:16:24:19 +0800]\"GET / HTTP/ 1.1\" 403 5039
asdfasdf

/data/log3/2023-09-01/下文件内容:

[root@localhost 2023-09-01]# cat localhost.localdomain_01.log
{"@timestamp":"2023-09-01T01:57:05.295Z","message":"114.114.114.114 [07/Feb/2018:16:24:19 +0800]\"GET /
HTTP/ 1.1\" 403 5039","host":"localhost.localdomain","@version":"1"}
[root@localhost 2023-09-01]# tail -f localhost.localdomain_01.log
{"@timestamp":"2023-09-01T01:57:05.295Z","message":"114.114.114.114 [07/Feb/2018:16:24:19 +0800]\"GET /
HTTP/ 1.1\" 403 5039","host":"localhost.localdomain","@version":"1"}
{"@timestamp":"2023-09-
01T01:57:53.440Z","message":"asdfasdf","host":"localhost.localdomain","@version":"1"}

我们发现输出的内容会在输入的内容上加了一些信息,如@timestamp @version host等属性
如果要让输入和输出一样。我们需要使用codec来格式编码

logstash配置
[root@localhost logstash]# cat file2-log.log

input {stdin {}
}
output {file {path => "/data/log3/%{+yyyy-MM-dd}/%{host}_%{+HH}.log"codec => line { format => "%{message}"}}
}


[root@localhost logstash]# /usr/local/logstash/bin/logstash -f /usr/local/logstash/file2-log.log
标准输入
adfasdfasdf
ELK大规模日志实时处理系统从入门到企业应用实战视频课程

查看输出:

[root@localhost 2023-09-01]# tail -f localhost.localdomain_02.log
adfasdfasdf
ELK大规模日志实时处理系统从入门到企业应用实战视频课程

[root@localhost 2023-09-01]# pwd
/data/log3/2023-09-01
[root@localhost 2023-09-01]#

输出与输入一致了。


八  ELK手机apache访问日志的案例
elk收集日子的几种方法
1.不修改源日志的输出格式,而是通过logstash的grok方式进行过滤、清晰,然后输出
  优点,对业务系统无影响,缺点是logstash可能会有瓶颈。
2.修改源日志的输出格式,按要求的格式改变源日志格式进行输出,logstash仅收集和传输。
  优点:减轻了logstash的压力,但是需要一定的工作量去处理源日志格式。

elk收集apache日志应用架构

apache(filebeat)  --   kafka(zookeeper) --  logstash  --  ES集群
使用第二种方式,用改变源日志输出格式来处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/67973.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

sentinel-core

引入依赖<dependencies><dependency><groupId>com.alibaba.csp</groupId><artifactId>sentinel-core</artifactId></dependency><dependency><groupId>com.alibaba.csp</groupId><artifactId>sentinel-anno…

Revit SDK 介绍:DWGFamilyCreation 导入DWG

前言 这个例子介绍了如何导入 DWG。 内容 核心逻辑代码&#xff1a; // 设置导入选项 DWGImportOptions options new DWGImportOptions(); options.Placement Autodesk.Revit.DB.ImportPlacement.Origin; options.OrientToView true; ElementId elementId null; // 导入…

【Vue2.0源码学习】生命周期篇-初始化阶段(initInjections)

文章目录 1. 前言2. initInjections函数分析resolveInject函数分析 3. 总结 1. 前言 本篇文章介绍生命周期初始化阶段所调用的第四个初始化函数——initInjections。从函数名字上来看&#xff0c;该函数是用来初始化实例中的inject选项的。说到inject选项&#xff0c;那必然离…

labview卸载重装碰到的问题

目录 labeiw卸载重装过程当中总是碰到一些问题&#xff0c;记录一下解决办法&#xff0c;碰到后好查找。 个人推荐卸载 用laview自带的卸载软件进行卸载&#xff0c;卸载的比较干净。 卸载完全第一步 启动 labview自带的卸载软件进行卸载。一般进行完这一步&#xff0c;就…

设计模式-9--迭代器模式(Iterator Pattern)

一、什么是迭代器模式 迭代器模式&#xff08;Iterator Pattern&#xff09;是一种行为型设计模式&#xff0c;用于提供一种统一的方式来访问一个聚合对象中的各个元素&#xff0c;而不需要暴露该聚合对象的内部结构。迭代器模式将遍历集合的责任从集合对象中分离出来&#xf…

【深入解析spring cloud gateway】07 自定义异常返回报文

Servlet的HttpResponse对象&#xff0c;返回响应报文&#xff0c;一般是这么写的&#xff0c;通过输出流直接就可以将返回报文输出。 OutputStream out response.getOutputStream(); out.write("输出的内容"); out.flush();在filter中如果发生异常&#xff08;例如…

Java中的网络编程------基于Socket的TCP编程和基于UDP的网络编程,netstat指令

Socket 在Java中&#xff0c;Socket是一种用于网络通信的编程接口&#xff0c;它允许不同计算机之间的程序进行数据交换和通信。Socket使得网络应用程序能够通过TCP或UDP协议在不同主机之间建立连接、发送数据和接收数据。以下是Socket的基本介绍&#xff1a; Socket类型&…

1775_树莓派3B键盘映射错误解决

全部学习汇总&#xff1a; GitHub - GreyZhang/little_bits_of_raspberry_pi: my hacking trip about raspberry pi. 入手树莓派3B之后用了没有多长时间&#xff0c;最初的这段时间感觉想让它代替我的PC机是不肯能的。性能先不说&#xff0c;我完全没有找到当初在我的笔记本上使…

css网格布局

css网格布局 常用属性 display: grid; //开启网格grid-template-columns: 2fr 1fr 1fr 1fr 1fr; //设置多少列每列宽度grid-gap: 10px; // 设置表格之间间距grid-template-rows: 50px 50px 50px 50px; // 设置多少行 每行的高度grid-column : 1 //占据位置 占据1格grid-colu…

ChatGPT分析日本排放核污水对世界的影响

文章目录 1 背景2 环境影响3 健康影响4 国际关系影响5 应对措施 近段时间被日本排放核污水到海里的消息刷屏了&#xff0c;这一举措引发了广泛的关注和担忧。本文结合ChatGPT来分析这件事的前因后果、会对世界造成的影响、以及应对措施。 1 背景 受2011年发生的大地震及海啸影响…

机器人中的数值优化(九)——拟牛顿方法(下)、BB方法

本系列文章主要是我在学习《数值优化》过程中的一些笔记和相关思考&#xff0c;主要的学习资料是深蓝学院的课程《机器人中的数值优化》和高立编著的《数值最优化方法》等&#xff0c;本系列文章篇数较多&#xff0c;不定期更新&#xff0c;上半部分介绍无约束优化&#xff0c;…

某物联网数智化园区行业基于 KubeSphere 的云原生实践

公司简介 作为物联网 数智化园区一体化解决方案提供商&#xff0c;我们致力于为大中型园区、停车场提供软硬件平台&#xff0c;帮助园区运营者实现数字化、智能化运营。 在使用 K8s 之前我们使用传统的方式部署上线&#xff0c;使用 spug&#xff08;一款轻量级无 Agent 的自…

网络版五子棋C++实现

目录 1.项目介绍 2.开发环境 3.核心技术 4.环境搭建 5.WebSocketpp介绍 5.1WebSocketpp是什么 5.2为什么使用WebSocketpp 5.3原理解析&#xff1a; 5.4WebSocketpp主要特性 6.WebSocketpp使用 7.JsonCpp使用 8.MySQL API 9.项目模块设计以及流程图 10.封装日志宏…

NewStarCTF 2022 web方向题解 wp

----------WEEK1---------- BUU NewStarCTF 公开赛赛道 WEEK1 [NotPHP] 先看题目&#xff0c;要传参加绕过。 分析一下代码&#xff1a;首先get一个datadata://test/plain,Wel…。然后key1和2用数组可以绕过。num2077a可以绕过弱类型。eval()中的php语句被#注释了&#xff0c…

【jvm】jvm系统线程

目录 一、虚拟机线程二、周期任务线程三、GC线程四、编译线程五、信号调度线程 一、虚拟机线程 1.这种线程的操作是需要jvm达到安全点才会出现 2.这些操作必须在不同的线程中发生的原因是它们都需要jvm达到安全点&#xff0c;这样堆才不会变化 3.这种线程的执行类型包括stop-th…

并发编程的故事——JUC

JUC 文章目录 JUC一、Semaphore二、CountDownLatch三、线程安全类 一、Semaphore 为什么需要用到Semaphore? 限流 Semaphore的场景&#xff1f; 秒杀商品的时候&#xff0c;不能够让那些没有秒杀成功的线程进入&#xff0c;只有占了坑位的才可以使用&#xff0c;这里可以用re…

【Hello Algorithm】二叉树相关算法

本篇博客介绍&#xff1a;介绍二叉树的相关算法 二叉树相关算法 二叉树结构遍历二叉树递归序二叉树的交集非递归方式实现二叉树遍历二叉树的层序遍历 二叉树难题二叉树的序列化和反序列化lc431求二叉树最宽的层二叉树的后继节点谷歌面试题 二叉树结构 如果对于二叉树的结构还有…

Linux命令执行完成提醒

有些命令任务执行时间较长&#xff0c;可以让其执行完成时发出声音来提示。 如下&#xff1a; ls && echo -e "\a" 前一条命令成功执行后&#xff0c;会发出声音。 如果当前不在Iterm2窗口里&#xff0c;还会弹窗提示。

2023新版医保目录明细(药品查询)

查询医保目录的主要目的是为了了解医保政策对于特定医疗服务、药品和医疗器械的覆盖范围和支付标准。大众可以通过查看医保目录可以确定哪些药品可以被医保支付以及报销的比例和限额&#xff1b;医药从业者可通过查看医保目录可以即使了解医保政策的变化&#xff0c;便于做出相…

15种下载文件的方法文件下载方法汇总超大文件下载

15种下载文件的方法&文件下载方法汇总&超大文件下载 15种下载文件的方法Pentesters经常将文件上传到受感染的盒子以帮助进行权限提升&#xff0c;或者保持在计算机上的存在。本博客将介绍将文件从您的计算机移动到受感染系统的15种不同方法。对于那些在盒子上存在且需要…