文章目录
- FileBeat + Pipeline 解析日志 保存至ElasticSearch(实战)
- 下载地址
- 目的
- 日志数据
- 模拟Pipeline
- 创建pipeline
- 查看Pipeline是否创建成功
- 创建FileBeat配置文件 filebeat.yml
- 创建自定义字段 FileBeat fields.yml
- 执行 FileBeat
- filebeat 启动命令说明
- 测试
- Pipeline 配置详解
- 1. 根据日志数据指定索引 _id
- FileBeat 配置详解
- 1.设置Filebeat保存到ElasticSearch索引副本、分片数量
- 异常处理
- 提示 ERROR instance/beat.go:802 Exiting: error initializing processors:
FileBeat + Pipeline 解析日志 保存至ElasticSearch(实战)
下载地址
https://www.elastic.co/cn/downloads/past-releases#filebeat
目的
使用FileBeat收集日志,Pipeline解析日志,最终写入ES
日志数据
2021-07-01 20:07:25 [XNIO-1 task-2] INFO fileBeatLogData - 查询用户|4|com.internet.operator.controller..list()|GET|http://127.0.0.1:8080/list|127.0.0.1|jast110|9a2e232170744efda8c526d67f4f5405|userAcco909571P&installedLocation=&pageNum=10&pageSize=10&superQuery=1|{"code":200,"msg":"查询成功","rows":[],"took":2,"total":1}|||0|||1625141245843||||||2021-07-01 20:07:25|142|91110108769392234H|测试111|X
模拟Pipeline
注意:如果同时通过
set
和script
设置字段,会以script
为准。
POST /_ingest/pipeline/_simulate
{"pipeline": {"processors" : [{"dissect": {"field": "message","pattern": "%{@logTimestamp} [%{logTthread}] %{loglevel} fileBeatLogData - %{logdata}"}},{"split": {"field": "logdata","separator": "\\|","target_field": "logdata"}},{"set": {"field": "actionOrFunction","value": "{{logdata.0}}"}},{"set": {"field": "businessType","value": "{{logdata.1}}"}},{"set": {"field": "callMethod","value": "{{logdata.2}}"}},{"set": {"field": "requestMethod","value": "{{logdata.3}}"}},{"set": {"field": "callLink","value": "{{logdata.4}}"}},{"set": {"field": "loginUserIp","value": "{{logdata.5}}"}},{"set": {"field": "userName","value": "{{logdata.6}}"}},{"set": {"field": "userId","value": "{{logdata.7}}"}},{"set": {"field": "paramOrInputData","value": "{{logdata.8}}"}},{"set": {"field": "resultOrOutputData","value": "{{logdata.9}}"}},{"set": {"field": "exceptionInfo","value": "{{logdata.10}}"}},{"set": {"field": "systemEnv","value": "{{logdata.11}}"}},{"set": {"field": "status","value": "{{logdata.12}}"}},{"set": {"field": "fullLinkId","value": "{{logdata.13}}"}},{"set": {"field": "subFullLinkId","value": "{{logdata.14}}"}},{"set": {"field": "currentTimeMillisecond","value": "{{logdata.15}}"}},{"convert": {"field": "currentTimeMillisecond","type": "long"}},{"set": {"field": "detail","value": "{{logdata.16}}"}},{"set": {"field": "other","value": "{{logdata.17}}"}},{"set": {"field": "errorData","value": "{{logdata.18}}"}},{"set": {"field": "errorDataSource","value": "{{logdata.19}}"}},{"set": {"field": "errorDataDetail","value": "{{logdata.20}}"}},{"set": {"field": "logTime","value": "{{logdata.21}}"}},{"set": {"field": "processTime","value": "{{logdata.22}}"}},{"convert": {"field": "processTime","type": "long"}},{"set": {"field": "orgCode","value": "{{logdata.23}}"}},{"set": {"field": "orgName","value": "{{logdata.24}}"}},{"set": {"field": "exceptionDetailInfo","value": "{{logdata.25}}"}},{"set": {"field": "message","value": ""}},{"set": {"field": "logdata","value": ""}},{"script": {"lang": "painless","source": """ ctx.insertTime = new Date(System.currentTimeMillis()+1000l*60*60*8); """}}]},"docs": [{"_source": {"message": "2021-07-01 20:07:25 [XNIO-1 task-2] INFO fileBeatLogData - 查询运营商宽带用户|4|com.bjga.internet.operator.controller.OperatorBroadbandController.list()|GET|http://127.0.0.1:8080/operator2/broadband/list|127.0.0.1|jast110|9a2e232170744efda8c526d67f4f5405|userAccount=%E5%8C%97%E4%BA%AC1%E5%B8%8256&installedPhone=639857&accountHolderName=%E4%B8%9C%E7%A5%A5%E6%9E%97&operatorCreditCode=91110108101909571P&installedLocation=&pageNum=10&pageSize=10&superQuery=1|{\"code\":200,\"msg\":\"查询成功\",\"rows\":[],\"took\":2,\"total\":1}|||0|||1625141245843||||||2021-07-01 20:07:25|142|91110108769392234H|测试111|X"}}]
}
创建pipeline
PUT _ingest/pipeline/logdatapipeline
{"description" : "outer pipeline","processors" : [{"dissect": {"field": "message","pattern": "%{@logTimestamp} [%{logTthread}] %{loglevel} fileBeatLogData - %{logdata}"}},{"split": {"field": "logdata","separator": "\\|","target_field": "logdata"}},{"set": {"field": "actionOrFunction","value": "{{logdata.0}}"}},{"set": {"field": "businessType","value": "{{logdata.1}}"}},{"set": {"field": "callMethod","value": "{{logdata.2}}"}},{"set": {"field": "requestMethod","value": "{{logdata.3}}"}},{"set": {"field": "callLink","value": "{{logdata.4}}"}},{"set": {"field": "loginUserIp","value": "{{logdata.5}}"}},{"set": {"field": "userName","value": "{{logdata.6}}"}},{"set": {"field": "userId","value": "{{logdata.7}}"}},{"set": {"field": "paramOrInputData","value": "{{logdata.8}}"}},{"set": {"field": "resultOrOutputData","value": "{{logdata.9}}"}},{"set": {"field": "exceptionInfo","value": "{{logdata.10}}"}},{"set": {"field": "systemEnv","value": "{{logdata.11}}"}},{"set": {"field": "status","value": "{{logdata.12}}"}},{"set": {"field": "fullLinkId","value": "{{logdata.13}}"}},{"set": {"field": "subFullLinkId","value": "{{logdata.14}}"}},{"set": {"field": "currentTimeMillisecond","value": "{{logdata.15}}"}},{"convert": {"field": "currentTimeMillisecond","type": "long"}},{"set": {"field": "detail","value": "{{logdata.16}}"}},{"set": {"field": "other","value": "{{logdata.17}}"}},{"set": {"field": "errorData","value": "{{logdata.18}}"}},{"set": {"field": "errorDataSource","value": "{{logdata.19}}"}},{"set": {"field": "errorDataDetail","value": "{{logdata.20}}"}},{"set": {"field": "logTime","value": "{{logdata.21}}"}},{"set": {"field": "processTime","value": "{{logdata.22}}"}},{"convert": {"field": "processTime","type": "long"}},{"set": {"field": "orgCode","value": "{{logdata.23}}"}},{"set": {"field": "orgName","value": "{{logdata.24}}"}},{"set": {"field": "exceptionDetailInfo","value": "{{logdata.25}}"}},{"set": {"field": "message","value": ""}},{"set": {"field": "logdata","value": ""}},{"script": {"lang": "painless","source": """ ctx.insertTime = new Date(System.currentTimeMillis()+1000l*60*60*8); """}}]
}
查看Pipeline是否创建成功
GET _ingest/pipeline/logDataPipeline?pretty
创建FileBeat配置文件 filebeat.yml
读取 /var/log2/*.log 文件写入ES
filebeat.inputs:
- type: logenabled: true
#读取的文件paths:- /var/log2/*.log
# 标记,在后面用于判断写入的索引fields:type: logDataPipelinesource: common
- type: logenabled: truepaths:- /var/log/1.log- /var/log/2.logfields:source: exception
- type: logenabled: truepaths:- /var/log/3.logfilebeat.config.modules:path: ${path.config}/modules.d/*.ymlreload.enabled: false# ======================= Elasticsearch template setting =======================setup.template.settings:# 索引默认分片数index.number_of_shards: 1# 索引默认副本数index.number_of_replicas: 1#index.codec: best_compression#_source.enabled: false# # 生成index模板的名称
#允许自动生成index模板
setup.template.enabled: true
# # 如果存在模块则覆盖
setup.template.overwrite: true
# # # 生成index模板时字段配置文件
setup.template.fields: fields.yml
setup.template.name: "logdata"
# # # 生成index模板匹配的index格式
setup.template.pattern: "logdata-*"
setup.ilm.enabled: auto
# 这里一定要注意 会在alias后面自动添加-*
setup.ilm.rollover_alias: "park-ssm"
setup.ilm.pattern: "{now/d}"
# # # 生成kibana中的index pattern,便于检索日志
# #setup.dashboards.index: myfilebeat-7.0.0-*
# #filebeat默认值为auto,创建的elasticsearch索引生命周期为50GB+30天。如果不改,可以不用设置
setup.ilm.enabled: false# =================================== Kibana ===================================
setup.kibana:# ---------------------------- Elasticsearch Output ----------------------------
output.elasticsearch:# Array of hosts to connect to.hosts: ["10.8.10.12:9200"]index: "logdata-%{+yyyy.MM.dd}"indices:- index: "logdata-%{[fields.source]}-%{+yyyy.MM.dd}"when.equals: fields: source: "common"- index: "logdata-%{[fields.source]}-%{+yyyy.MM.dd}"when.equals:fields:source: "exception"pipelines:- pipeline: logDataPipelinewhen.equals:fields.type: logDataPipeline# ================================= Processors =================================
processors:- add_host_metadata:when.not.contains.tags: forwarded- add_cloud_metadata: ~- add_docker_metadata: ~- add_kubernetes_metadata: ~
创建自定义字段 FileBeat fields.yml
# 我们自定义的
- key: rbttitle: rbtdescription: rbt log data fields fields:- name: logdatatype: keyword- name: actionOrFunctiontype: keyword- name: businessTypetype: keyword- name: callMethodtype: keyword- name: requestMethodtype: keyword- name: callLinktype: keyword- name: loginUserIptype: keyword- name: userNametype: keyword- name: userIdtype: keyword- name: paramOrInputDatatype: keyword- name: resultOrOutputDatatype: keyword- name: exceptionInfotype: keyword- name: systemEnvtype: keyword- name: statustype: long- name: fullLinkIdtype: keyword- name: subFullLinkIdtype: keyword- name: currentTimeMillisecondtype: long- name: detailtype: keyword- name: othertype: keyword- name: errorDatatype: keyword- name: errorDataSourcetype: keyword- name: errorDataDetailtype: keyword- name: logTimetype: keyword- name: processTimetype: long- name: orgCodetype: keyword- name: orgNametype: keyword- name: exceptionDetailInfotype: keyword- name: insertTimetype: date# FileBeat自带的
- key: ecstitle: ECSdescription: ECS Fields.fields:- name: '@timestamp'level: corerequired: truetype: datedescription: 'Date/time when the event originated.This is the date/time extracted from the event, typically representing whenthe event was generated by the source.If the event source has no original timestamp, this value is typically populatedby the first time the event was received by the pipeline.Required field for all events.'example: '2016-05-23T08:05:34.853Z'
执行 FileBeat
[root@test13 filebeat-7.9.3-linux-x86_64]# ls
data fields.yml.bak filebeat.reference.yml filebeat.yml.bak LICENSE.txt modules.d README.md
fields.yml filebeat filebeat.yml kibana module NOTICE.txt s.log
[root@test13 filebeat-7.9.3-linux-x86_64]# ./filebeat -e
filebeat 启动命令说明
-c 指定配置文件
-d "*" 报错时候,查看具体的错误原因。
测试
新增数据到 vim /var/log2/test.log
2021-07-01 20:07:25 [XNIO-1 task-2] INFO fileBeatLogData - 查询用户|4|com.internet.operator.controller..list()|GET|http://127.0.0.1:8080/list|127.0.0.1|jast110|9a2e232170744efda8c526d67f4f5405|userAcco909571P&installedLocation=&pageNum=10&pageSize=10&superQuery=1|{"code":200,"msg":"查询成功","rows":[],"took":2,"total":1}|||0|||1625141245843||||||2021-07-01 20:07:25|142|91110108769392234H|测试111|X
查询结果发现日志已经进入到ES
个人公众号(大数据学习交流): hadoopwiki
Pipeline 配置详解
1. 根据日志数据指定索引 _id
每个文档都会有一些元数据字段信息(metadata filed),比如_id,_index,_type 等,我们在 processors 中也可以直接访问这些信息的,比如下面的例子:
{"set": {"field": "_id","value": "{{logdata.6}}"}
}
FileBeat 配置详解
注意:首次创建的时候FileBeat会在ElasticSearch设置我们再FileBeat配置的_template索引模板,后续重启服务即便配置改了都不会更新该模板,比如下面的分片副本数量,首次启动后,该配置会写入索引模板中,后续修改不起作用。需要重新配置修改,需要删除filebeat目录下的data目录。
1.设置Filebeat保存到ElasticSearch索引副本、分片数量
修改 filebeat.yml
文件中下面参数
setup.template.settings:# 索引默认分片数index.number_of_shards: 1# 索引默认副本数index.number_of_replicas: 1
异常处理
提示 ERROR instance/beat.go:802 Exiting: error initializing processors:
异常内容如下
2022-01-20T14:39:22.441+0800 ERROR instance/beat.go:802 Exiting: error initializing processors: Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?
Exiting: error initializing processors: Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?
解决方法
注释掉filebeat.yml
文件中的add_docker_metadata
和add_kubernetes_metadata
# ================================= Processors =================================
processors:- add_host_metadata:when.not.contains.tags: forwarded- add_cloud_metadata: ~
# - add_docker_metadata: ~
# - add_kubernetes_metadata: ~