FileBeat + Pipeline 解析日志 保存至ElasticSearch(实战)

文章目录

  • FileBeat + Pipeline 解析日志 保存至ElasticSearch(实战)
    • 下载地址
    • 目的
    • 日志数据
    • 模拟Pipeline
    • 创建pipeline
      • 查看Pipeline是否创建成功
    • 创建FileBeat配置文件 filebeat.yml
    • 创建自定义字段 FileBeat fields.yml
    • 执行 FileBeat
    • filebeat 启动命令说明
    • 测试
  • Pipeline 配置详解
    • 1. 根据日志数据指定索引 _id
  • FileBeat 配置详解
    • 1.设置Filebeat保存到ElasticSearch索引副本、分片数量
  • 异常处理
    • 提示 ERROR instance/beat.go:802 Exiting: error initializing processors:

FileBeat + Pipeline 解析日志 保存至ElasticSearch(实战)

下载地址

https://www.elastic.co/cn/downloads/past-releases#filebeat

目的

使用FileBeat收集日志,Pipeline解析日志,最终写入ES

日志数据

2021-07-01 20:07:25 [XNIO-1 task-2] INFO  fileBeatLogData - 查询用户|4|com.internet.operator.controller..list()|GET|http://127.0.0.1:8080/list|127.0.0.1|jast110|9a2e232170744efda8c526d67f4f5405|userAcco909571P&installedLocation=&pageNum=10&pageSize=10&superQuery=1|{"code":200,"msg":"查询成功","rows":[],"took":2,"total":1}|||0|||1625141245843||||||2021-07-01 20:07:25|142|91110108769392234H|测试111|X

模拟Pipeline

注意:如果同时通过setscript设置字段,会以script为准。

POST /_ingest/pipeline/_simulate
{"pipeline": {"processors" : [{"dissect": {"field": "message","pattern": "%{@logTimestamp} [%{logTthread}] %{loglevel} fileBeatLogData - %{logdata}"}},{"split": {"field": "logdata","separator": "\\|","target_field": "logdata"}},{"set": {"field": "actionOrFunction","value": "{{logdata.0}}"}},{"set": {"field": "businessType","value": "{{logdata.1}}"}},{"set": {"field": "callMethod","value": "{{logdata.2}}"}},{"set": {"field": "requestMethod","value": "{{logdata.3}}"}},{"set": {"field": "callLink","value": "{{logdata.4}}"}},{"set": {"field": "loginUserIp","value": "{{logdata.5}}"}},{"set": {"field": "userName","value": "{{logdata.6}}"}},{"set": {"field": "userId","value": "{{logdata.7}}"}},{"set": {"field": "paramOrInputData","value": "{{logdata.8}}"}},{"set": {"field": "resultOrOutputData","value": "{{logdata.9}}"}},{"set": {"field": "exceptionInfo","value": "{{logdata.10}}"}},{"set": {"field": "systemEnv","value": "{{logdata.11}}"}},{"set": {"field": "status","value": "{{logdata.12}}"}},{"set": {"field": "fullLinkId","value": "{{logdata.13}}"}},{"set": {"field": "subFullLinkId","value": "{{logdata.14}}"}},{"set": {"field": "currentTimeMillisecond","value": "{{logdata.15}}"}},{"convert": {"field": "currentTimeMillisecond","type": "long"}},{"set": {"field": "detail","value": "{{logdata.16}}"}},{"set": {"field": "other","value": "{{logdata.17}}"}},{"set": {"field": "errorData","value": "{{logdata.18}}"}},{"set": {"field": "errorDataSource","value": "{{logdata.19}}"}},{"set": {"field": "errorDataDetail","value": "{{logdata.20}}"}},{"set": {"field": "logTime","value": "{{logdata.21}}"}},{"set": {"field": "processTime","value": "{{logdata.22}}"}},{"convert": {"field": "processTime","type": "long"}},{"set": {"field": "orgCode","value": "{{logdata.23}}"}},{"set": {"field": "orgName","value": "{{logdata.24}}"}},{"set": {"field": "exceptionDetailInfo","value": "{{logdata.25}}"}},{"set": {"field": "message","value": ""}},{"set": {"field": "logdata","value": ""}},{"script": {"lang": "painless","source": """ ctx.insertTime = new Date(System.currentTimeMillis()+1000l*60*60*8);  """}}]},"docs": [{"_source": {"message": "2021-07-01 20:07:25 [XNIO-1 task-2] INFO  fileBeatLogData - 查询运营商宽带用户|4|com.bjga.internet.operator.controller.OperatorBroadbandController.list()|GET|http://127.0.0.1:8080/operator2/broadband/list|127.0.0.1|jast110|9a2e232170744efda8c526d67f4f5405|userAccount=%E5%8C%97%E4%BA%AC1%E5%B8%8256&installedPhone=639857&accountHolderName=%E4%B8%9C%E7%A5%A5%E6%9E%97&operatorCreditCode=91110108101909571P&installedLocation=&pageNum=10&pageSize=10&superQuery=1|{\"code\":200,\"msg\":\"查询成功\",\"rows\":[],\"took\":2,\"total\":1}|||0|||1625141245843||||||2021-07-01 20:07:25|142|91110108769392234H|测试111|X"}}]
}

创建pipeline

PUT _ingest/pipeline/logdatapipeline
{"description" : "outer pipeline","processors" : [{"dissect": {"field": "message","pattern": "%{@logTimestamp} [%{logTthread}] %{loglevel} fileBeatLogData - %{logdata}"}},{"split": {"field": "logdata","separator": "\\|","target_field": "logdata"}},{"set": {"field": "actionOrFunction","value": "{{logdata.0}}"}},{"set": {"field": "businessType","value": "{{logdata.1}}"}},{"set": {"field": "callMethod","value": "{{logdata.2}}"}},{"set": {"field": "requestMethod","value": "{{logdata.3}}"}},{"set": {"field": "callLink","value": "{{logdata.4}}"}},{"set": {"field": "loginUserIp","value": "{{logdata.5}}"}},{"set": {"field": "userName","value": "{{logdata.6}}"}},{"set": {"field": "userId","value": "{{logdata.7}}"}},{"set": {"field": "paramOrInputData","value": "{{logdata.8}}"}},{"set": {"field": "resultOrOutputData","value": "{{logdata.9}}"}},{"set": {"field": "exceptionInfo","value": "{{logdata.10}}"}},{"set": {"field": "systemEnv","value": "{{logdata.11}}"}},{"set": {"field": "status","value": "{{logdata.12}}"}},{"set": {"field": "fullLinkId","value": "{{logdata.13}}"}},{"set": {"field": "subFullLinkId","value": "{{logdata.14}}"}},{"set": {"field": "currentTimeMillisecond","value": "{{logdata.15}}"}},{"convert": {"field": "currentTimeMillisecond","type": "long"}},{"set": {"field": "detail","value": "{{logdata.16}}"}},{"set": {"field": "other","value": "{{logdata.17}}"}},{"set": {"field": "errorData","value": "{{logdata.18}}"}},{"set": {"field": "errorDataSource","value": "{{logdata.19}}"}},{"set": {"field": "errorDataDetail","value": "{{logdata.20}}"}},{"set": {"field": "logTime","value": "{{logdata.21}}"}},{"set": {"field": "processTime","value": "{{logdata.22}}"}},{"convert": {"field": "processTime","type": "long"}},{"set": {"field": "orgCode","value": "{{logdata.23}}"}},{"set": {"field": "orgName","value": "{{logdata.24}}"}},{"set": {"field": "exceptionDetailInfo","value": "{{logdata.25}}"}},{"set": {"field": "message","value": ""}},{"set": {"field": "logdata","value": ""}},{"script": {"lang": "painless","source": """ ctx.insertTime = new Date(System.currentTimeMillis()+1000l*60*60*8);  """}}]
}

查看Pipeline是否创建成功

GET _ingest/pipeline/logDataPipeline?pretty

创建FileBeat配置文件 filebeat.yml

读取 /var/log2/*.log 文件写入ES

filebeat.inputs:
- type: logenabled: true
#读取的文件paths:- /var/log2/*.log
# 标记,在后面用于判断写入的索引fields:type: logDataPipelinesource: common
- type: logenabled: truepaths:- /var/log/1.log- /var/log/2.logfields:source: exception
- type: logenabled: truepaths:- /var/log/3.logfilebeat.config.modules:path: ${path.config}/modules.d/*.ymlreload.enabled: false# ======================= Elasticsearch template setting =======================setup.template.settings:# 索引默认分片数index.number_of_shards: 1# 索引默认副本数index.number_of_replicas: 1#index.codec: best_compression#_source.enabled: false# # 生成index模板的名称
#允许自动生成index模板
setup.template.enabled: true
# # 如果存在模块则覆盖
setup.template.overwrite: true
# # # 生成index模板时字段配置文件
setup.template.fields: fields.yml
setup.template.name: "logdata" 
# # # 生成index模板匹配的index格式       
setup.template.pattern: "logdata-*" 
setup.ilm.enabled: auto
# 这里一定要注意 会在alias后面自动添加-*
setup.ilm.rollover_alias: "park-ssm"
setup.ilm.pattern: "{now/d}"
# # # 生成kibana中的index pattern,便于检索日志
# #setup.dashboards.index: myfilebeat-7.0.0-*
# #filebeat默认值为auto,创建的elasticsearch索引生命周期为50GB+30天。如果不改,可以不用设置
setup.ilm.enabled: false# =================================== Kibana ===================================
setup.kibana:# ---------------------------- Elasticsearch Output ----------------------------
output.elasticsearch:# Array of hosts to connect to.hosts: ["10.8.10.12:9200"]index: "logdata-%{+yyyy.MM.dd}"indices:- index: "logdata-%{[fields.source]}-%{+yyyy.MM.dd}"when.equals: fields: source: "common"- index: "logdata-%{[fields.source]}-%{+yyyy.MM.dd}"when.equals:fields:source: "exception"pipelines:- pipeline: logDataPipelinewhen.equals:fields.type: logDataPipeline# ================================= Processors =================================
processors:- add_host_metadata:when.not.contains.tags: forwarded- add_cloud_metadata: ~- add_docker_metadata: ~- add_kubernetes_metadata: ~

创建自定义字段 FileBeat fields.yml

# 我们自定义的
- key: rbttitle: rbtdescription: rbt log data fields fields:- name: logdatatype: keyword- name: actionOrFunctiontype: keyword- name: businessTypetype: keyword- name: callMethodtype: keyword- name: requestMethodtype: keyword- name: callLinktype: keyword- name: loginUserIptype: keyword- name: userNametype: keyword- name: userIdtype: keyword- name: paramOrInputDatatype: keyword- name: resultOrOutputDatatype: keyword- name: exceptionInfotype: keyword- name: systemEnvtype: keyword- name: statustype: long- name: fullLinkIdtype: keyword- name: subFullLinkIdtype: keyword- name: currentTimeMillisecondtype: long- name: detailtype: keyword- name: othertype: keyword- name: errorDatatype: keyword- name: errorDataSourcetype: keyword- name: errorDataDetailtype: keyword- name: logTimetype: keyword- name: processTimetype: long- name: orgCodetype: keyword- name: orgNametype: keyword- name: exceptionDetailInfotype: keyword- name: insertTimetype: date# FileBeat自带的
- key: ecstitle: ECSdescription: ECS Fields.fields:- name: '@timestamp'level: corerequired: truetype: datedescription: 'Date/time when the event originated.This is the date/time extracted from the event, typically representing whenthe event was generated by the source.If the event source has no original timestamp, this value is typically populatedby the first time the event was received by the pipeline.Required field for all events.'example: '2016-05-23T08:05:34.853Z'

执行 FileBeat

[root@test13 filebeat-7.9.3-linux-x86_64]# ls
data        fields.yml.bak  filebeat.reference.yml  filebeat.yml.bak  LICENSE.txt  modules.d   README.md
fields.yml  filebeat        filebeat.yml            kibana            module       NOTICE.txt  s.log
[root@test13 filebeat-7.9.3-linux-x86_64]# ./filebeat -e 

filebeat 启动命令说明

-c 指定配置文件
-d "*" 报错时候,查看具体的错误原因。

测试

新增数据到 vim /var/log2/test.log

2021-07-01 20:07:25 [XNIO-1 task-2] INFO  fileBeatLogData - 查询用户|4|com.internet.operator.controller..list()|GET|http://127.0.0.1:8080/list|127.0.0.1|jast110|9a2e232170744efda8c526d67f4f5405|userAcco909571P&installedLocation=&pageNum=10&pageSize=10&superQuery=1|{"code":200,"msg":"查询成功","rows":[],"took":2,"total":1}|||0|||1625141245843||||||2021-07-01 20:07:25|142|91110108769392234H|测试111|X

查询结果发现日志已经进入到ES
在这里插入图片描述

个人公众号(大数据学习交流): hadoopwiki

Pipeline 配置详解

1. 根据日志数据指定索引 _id

每个文档都会有一些元数据字段信息(metadata filed),比如_id,_index,_type 等,我们在 processors 中也可以直接访问这些信息的,比如下面的例子:

{"set": {"field": "_id","value": "{{logdata.6}}"}
}

FileBeat 配置详解

注意:首次创建的时候FileBeat会在ElasticSearch设置我们再FileBeat配置的_template索引模板,后续重启服务即便配置改了都不会更新该模板,比如下面的分片副本数量,首次启动后,该配置会写入索引模板中,后续修改不起作用。需要重新配置修改,需要删除filebeat目录下的data目录。

1.设置Filebeat保存到ElasticSearch索引副本、分片数量

修改 filebeat.yml 文件中下面参数

setup.template.settings:# 索引默认分片数index.number_of_shards: 1# 索引默认副本数index.number_of_replicas: 1

异常处理

提示 ERROR instance/beat.go:802 Exiting: error initializing processors:

异常内容如下

2022-01-20T14:39:22.441+0800    ERROR   instance/beat.go:802    Exiting: error initializing processors: Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?
Exiting: error initializing processors: Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?

解决方法
注释掉filebeat.yml文件中的add_docker_metadataadd_kubernetes_metadata

# ================================= Processors =================================
processors:- add_host_metadata:when.not.contains.tags: forwarded- add_cloud_metadata: ~
#  - add_docker_metadata: ~
#  - add_kubernetes_metadata: ~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/509647.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

网络编程中的关键问题总结

内容目录: 连接建立连接断开消息到达发送消息消息发送完毕其它问题参考 网络编程中的关键问题总结 总结下网络编程中关键的细节问题,包含连接建立、连接断开、消息到达、发送消息等等; 连接建立 包括服务端接受 (accept) 新连接和客户端成功发…

List实现类性能和特点分析

面向接口编程: 接口类型 变量 new 实现类(); List list new ArrayList(); List实现类特点和性能分析: 三者共同的特点(共同遵循的规范): 1):允许元素重复. 2):记录元素的先后添加顺序. Vector类: 底层才有数组结构算法,方法都使用了synchronized修饰,线程安全,但是性能…

数据结构实验之栈八:栈的基本操作

题目描述 堆栈是一种基本的数据结构。堆栈具有两种基本操作方式&#xff0c;push 和 pop。push一个值会将其压入栈顶&#xff0c;而 pop 则会将栈顶的值弹出。现在我们就来验证一下堆栈的使用。 输入 首先输入整数t&#xff08;1 < t < 10&#xff09;&#xff0c;代表测…

F5 BIGip 负载均衡 IP算法解密工具

BIGip是对负载均衡的实现&#xff0c;主要通过Virtual Server、iRules、Pool、Node、Monitor和Persistent&#xff08;会话保持&#xff09;实现。BIGip在实现会话保持机制时会在用户首次发起请求时&#xff0c;会为用户设置一个cookie&#xff0c;即服务端会添加set-cookie响应…

Java集合框架-重构设计

根据Vector类,ArrayList类,LinkedList类所有具有的存储特点以及拥有的方法入手,发现共性就往上抽取. 共同的特点: 1):允许元素重复的. 2):会记录先后添加的顺序. 共同的方法: 如下图. 根据他们的特点,我就可以指定规范: 遵循该规范的实现类,无论底层算法如何,都必须保证允…

回文串判定

题目描述 输入一串字符&#xff08;长度小于100&#xff09;&#xff0c;判断该串字符是否是回文串&#xff08;正序读与逆序读内容相同&#xff09;。 输入 输入一串字符&#xff08;长度小于100&#xff09;。 输出 若该串字符是回文串输出“yes"&#xff0c;否则输出”…

Canal Mysql binlog 同步至 Hbase ES

文章目录一、Canal介绍工作原理canal 工作原理二、下载三、安装使用Mysql准备canal 安装解压缩 canal-deployer配置修改启动查看server日志查看instance日志服务停止canal-client使用Canal Adapter数据同步Hbase数据同步ElasticSearch一、Canal介绍 早期阿里巴巴因为杭州和美国…

java中集合的迭代操作

集合的迭代操作: 把集合做的元素一个一个的遍历取出来. 迭代器对象: Iterator: 迭代器对象,只能从上往下迭代. boolean hasNext(); 判断当前指针后是否有下一个元素 Object next():获取指针的下一个元素,并且移动指针. ListIterator: 是Iterator接口的子接口,支持双向迭代…

Canal同步ES报错,java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to c

Canal同步ES报错 提示类型转换失败 2021-09-20 13:10:54.094 [main] ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 failed java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassCastException: com.alibab…

C语言实验——数组逆序

题目描述 有n个整数&#xff0c;使其最后m个数变成最前面的m个数&#xff0c;其他各数顺序向后移m&#xff08;m < n < 100)个位置。输入 输入数据有2行&#xff0c;第一行的第一个数为n&#xff0c;后面是n个整数&#xff0c;第二行整数m。输出 按先后顺序输出n个整数。…

用C++11的std::async代替线程的创建

转自&#xff1a;http://www.cnblogs.com/qicosmos/p/3534211.html c11中增加了线程&#xff0c;使得我们可以非常方便的创建线程&#xff0c;它的基本用法是这样的&#xff1a; void f(int n); std::thread t(f, n 1); t.join(); 但是线程毕竟是属于比较低层次的东西&#xf…

HashSet类

Set是Collection子接口&#xff0c;模拟了数学上的集的概念。 Set集合存储特点: 1):不允许元素重复. 2):不会记录元素的先后添加顺序. Set只包含从Collection继承的方法&#xff0c;不过Set无法记住添加的顺序&#xff0c;不允许包含重复的元素。当试图添加两个相同元素进Se…

Mysql写入数据时,adapter 日志报ES连接错误

Mysql写入数据时&#xff0c;adapter 日志报ES连接错误 日志如下&#xff1a; 2021-09-20 13:51:03.795 [pool-1-thread-1] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - NoNodeAvailableException[None of the configured nodes are available: [{#tr…

扩展框架分析

在服务器端启动通信服务器程序CommunicateServer.exe,接受客户端的连接&#xff1b;在客户端启动GameStart.exe&#xff0c;连接至服务器端&#xff0c;在接收到“Start”消息时启动GameEntryPoint.exe&#xff0c;GameEntryPoint.exe 将根据配置文件&#xff08;引导文件&…

走迷宫

题目描述 一个由n * m 个格子组成的迷宫&#xff0c;起点是(1, 1)&#xff0c; 终点是(n, m)&#xff0c;每次可以向上下左右四个方向任意走一步&#xff0c;并且有些格子是不能走动&#xff0c;求从起点到终点经过每个格子至多一次的走法数。 输入 第一行一个整数T 表示有T 组…

Canal Mysql同步至ES/Hbase只有新增时生效,修改删除不生效

问题描述 新增Mysql数据时&#xff0c;ES、Hbase数据会同步成功&#xff1b;当删除Mysql数据&#xff0c;或者修改Mysql数据时同步ES、Hbase数据无变化(PS:修改和删除加上LIMIT xxx 就可以成功。) 问题分析 通过查看日志发现新增和删除记录的日志区别&#xff1a;新增data有…

C++ 11 中的右值引用

C 11 中的右值引用 右值引用的功能 首先&#xff0c;我并不介绍什么是右值引用&#xff0c;而是以一个例子里来介绍一下右值引用的功能&#xff1a; #include <iostream> #include <vector> using namespace std; class obj { public : …

LinkedHashSet类

List接口: 允许元素重复,记录先后添加顺序. Set接口: 不允许元素重复,不记录先后添加顺序. 需求: 不允许元素重复,但是需要保证先后添加的顺序. LinkedHashSet:底层才有哈希表和链表算法. 哈希表:来保证唯一性,.此时就是HashSet,在哈希表中元素没有先后顺序. 链表: 来记录…

数据结构实验之串一:KMP简单应用

题目描述 给定两个字符串string1和string2&#xff0c;判断string2是否为string1的子串。输入 输入包含多组数据&#xff0c;每组测试数据包含两行&#xff0c;第一行代表string1(长度小于1000000)&#xff0c;第二行代表string2&#xff08;长度小于1000000&#xff09;&#…

Canal Mysql binlog 同步至 ElasticSearch 详细介绍

文章目录数据同步ElasticSearch单表基本配置适配器映射文件详细介绍&#xff08;单表、多表映射介绍&#xff09;单表映射索引示例sql单表映射索引示例sql带函数或运算操作多表映射(一对一, 多对一)索引示例sql多表映射(一对多)索引示例sql其它类型的sql示例注意事项本文详细介…