FileBeat + Pipeline 解析日志 保存至ElasticSearch(实战)

文章目录

  • FileBeat + Pipeline 解析日志 保存至ElasticSearch(实战)
    • 下载地址
    • 目的
    • 日志数据
    • 模拟Pipeline
    • 创建pipeline
      • 查看Pipeline是否创建成功
    • 创建FileBeat配置文件 filebeat.yml
    • 创建自定义字段 FileBeat fields.yml
    • 执行 FileBeat
    • filebeat 启动命令说明
    • 测试
  • Pipeline 配置详解
    • 1. 根据日志数据指定索引 _id
  • FileBeat 配置详解
    • 1.设置Filebeat保存到ElasticSearch索引副本、分片数量
  • 异常处理
    • 提示 ERROR instance/beat.go:802 Exiting: error initializing processors:

FileBeat + Pipeline 解析日志 保存至ElasticSearch(实战)

下载地址

https://www.elastic.co/cn/downloads/past-releases#filebeat

目的

使用FileBeat收集日志,Pipeline解析日志,最终写入ES

日志数据

2021-07-01 20:07:25 [XNIO-1 task-2] INFO  fileBeatLogData - 查询用户|4|com.internet.operator.controller..list()|GET|http://127.0.0.1:8080/list|127.0.0.1|jast110|9a2e232170744efda8c526d67f4f5405|userAcco909571P&installedLocation=&pageNum=10&pageSize=10&superQuery=1|{"code":200,"msg":"查询成功","rows":[],"took":2,"total":1}|||0|||1625141245843||||||2021-07-01 20:07:25|142|91110108769392234H|测试111|X

模拟Pipeline

注意:如果同时通过setscript设置字段,会以script为准。

POST /_ingest/pipeline/_simulate
{"pipeline": {"processors" : [{"dissect": {"field": "message","pattern": "%{@logTimestamp} [%{logTthread}] %{loglevel} fileBeatLogData - %{logdata}"}},{"split": {"field": "logdata","separator": "\\|","target_field": "logdata"}},{"set": {"field": "actionOrFunction","value": "{{logdata.0}}"}},{"set": {"field": "businessType","value": "{{logdata.1}}"}},{"set": {"field": "callMethod","value": "{{logdata.2}}"}},{"set": {"field": "requestMethod","value": "{{logdata.3}}"}},{"set": {"field": "callLink","value": "{{logdata.4}}"}},{"set": {"field": "loginUserIp","value": "{{logdata.5}}"}},{"set": {"field": "userName","value": "{{logdata.6}}"}},{"set": {"field": "userId","value": "{{logdata.7}}"}},{"set": {"field": "paramOrInputData","value": "{{logdata.8}}"}},{"set": {"field": "resultOrOutputData","value": "{{logdata.9}}"}},{"set": {"field": "exceptionInfo","value": "{{logdata.10}}"}},{"set": {"field": "systemEnv","value": "{{logdata.11}}"}},{"set": {"field": "status","value": "{{logdata.12}}"}},{"set": {"field": "fullLinkId","value": "{{logdata.13}}"}},{"set": {"field": "subFullLinkId","value": "{{logdata.14}}"}},{"set": {"field": "currentTimeMillisecond","value": "{{logdata.15}}"}},{"convert": {"field": "currentTimeMillisecond","type": "long"}},{"set": {"field": "detail","value": "{{logdata.16}}"}},{"set": {"field": "other","value": "{{logdata.17}}"}},{"set": {"field": "errorData","value": "{{logdata.18}}"}},{"set": {"field": "errorDataSource","value": "{{logdata.19}}"}},{"set": {"field": "errorDataDetail","value": "{{logdata.20}}"}},{"set": {"field": "logTime","value": "{{logdata.21}}"}},{"set": {"field": "processTime","value": "{{logdata.22}}"}},{"convert": {"field": "processTime","type": "long"}},{"set": {"field": "orgCode","value": "{{logdata.23}}"}},{"set": {"field": "orgName","value": "{{logdata.24}}"}},{"set": {"field": "exceptionDetailInfo","value": "{{logdata.25}}"}},{"set": {"field": "message","value": ""}},{"set": {"field": "logdata","value": ""}},{"script": {"lang": "painless","source": """ ctx.insertTime = new Date(System.currentTimeMillis()+1000l*60*60*8);  """}}]},"docs": [{"_source": {"message": "2021-07-01 20:07:25 [XNIO-1 task-2] INFO  fileBeatLogData - 查询运营商宽带用户|4|com.bjga.internet.operator.controller.OperatorBroadbandController.list()|GET|http://127.0.0.1:8080/operator2/broadband/list|127.0.0.1|jast110|9a2e232170744efda8c526d67f4f5405|userAccount=%E5%8C%97%E4%BA%AC1%E5%B8%8256&installedPhone=639857&accountHolderName=%E4%B8%9C%E7%A5%A5%E6%9E%97&operatorCreditCode=91110108101909571P&installedLocation=&pageNum=10&pageSize=10&superQuery=1|{\"code\":200,\"msg\":\"查询成功\",\"rows\":[],\"took\":2,\"total\":1}|||0|||1625141245843||||||2021-07-01 20:07:25|142|91110108769392234H|测试111|X"}}]
}

创建pipeline

PUT _ingest/pipeline/logdatapipeline
{"description" : "outer pipeline","processors" : [{"dissect": {"field": "message","pattern": "%{@logTimestamp} [%{logTthread}] %{loglevel} fileBeatLogData - %{logdata}"}},{"split": {"field": "logdata","separator": "\\|","target_field": "logdata"}},{"set": {"field": "actionOrFunction","value": "{{logdata.0}}"}},{"set": {"field": "businessType","value": "{{logdata.1}}"}},{"set": {"field": "callMethod","value": "{{logdata.2}}"}},{"set": {"field": "requestMethod","value": "{{logdata.3}}"}},{"set": {"field": "callLink","value": "{{logdata.4}}"}},{"set": {"field": "loginUserIp","value": "{{logdata.5}}"}},{"set": {"field": "userName","value": "{{logdata.6}}"}},{"set": {"field": "userId","value": "{{logdata.7}}"}},{"set": {"field": "paramOrInputData","value": "{{logdata.8}}"}},{"set": {"field": "resultOrOutputData","value": "{{logdata.9}}"}},{"set": {"field": "exceptionInfo","value": "{{logdata.10}}"}},{"set": {"field": "systemEnv","value": "{{logdata.11}}"}},{"set": {"field": "status","value": "{{logdata.12}}"}},{"set": {"field": "fullLinkId","value": "{{logdata.13}}"}},{"set": {"field": "subFullLinkId","value": "{{logdata.14}}"}},{"set": {"field": "currentTimeMillisecond","value": "{{logdata.15}}"}},{"convert": {"field": "currentTimeMillisecond","type": "long"}},{"set": {"field": "detail","value": "{{logdata.16}}"}},{"set": {"field": "other","value": "{{logdata.17}}"}},{"set": {"field": "errorData","value": "{{logdata.18}}"}},{"set": {"field": "errorDataSource","value": "{{logdata.19}}"}},{"set": {"field": "errorDataDetail","value": "{{logdata.20}}"}},{"set": {"field": "logTime","value": "{{logdata.21}}"}},{"set": {"field": "processTime","value": "{{logdata.22}}"}},{"convert": {"field": "processTime","type": "long"}},{"set": {"field": "orgCode","value": "{{logdata.23}}"}},{"set": {"field": "orgName","value": "{{logdata.24}}"}},{"set": {"field": "exceptionDetailInfo","value": "{{logdata.25}}"}},{"set": {"field": "message","value": ""}},{"set": {"field": "logdata","value": ""}},{"script": {"lang": "painless","source": """ ctx.insertTime = new Date(System.currentTimeMillis()+1000l*60*60*8);  """}}]
}

查看Pipeline是否创建成功

GET _ingest/pipeline/logDataPipeline?pretty

创建FileBeat配置文件 filebeat.yml

读取 /var/log2/*.log 文件写入ES

filebeat.inputs:
- type: logenabled: true
#读取的文件paths:- /var/log2/*.log
# 标记,在后面用于判断写入的索引fields:type: logDataPipelinesource: common
- type: logenabled: truepaths:- /var/log/1.log- /var/log/2.logfields:source: exception
- type: logenabled: truepaths:- /var/log/3.logfilebeat.config.modules:path: ${path.config}/modules.d/*.ymlreload.enabled: false# ======================= Elasticsearch template setting =======================setup.template.settings:# 索引默认分片数index.number_of_shards: 1# 索引默认副本数index.number_of_replicas: 1#index.codec: best_compression#_source.enabled: false# # 生成index模板的名称
#允许自动生成index模板
setup.template.enabled: true
# # 如果存在模块则覆盖
setup.template.overwrite: true
# # # 生成index模板时字段配置文件
setup.template.fields: fields.yml
setup.template.name: "logdata" 
# # # 生成index模板匹配的index格式       
setup.template.pattern: "logdata-*" 
setup.ilm.enabled: auto
# 这里一定要注意 会在alias后面自动添加-*
setup.ilm.rollover_alias: "park-ssm"
setup.ilm.pattern: "{now/d}"
# # # 生成kibana中的index pattern,便于检索日志
# #setup.dashboards.index: myfilebeat-7.0.0-*
# #filebeat默认值为auto,创建的elasticsearch索引生命周期为50GB+30天。如果不改,可以不用设置
setup.ilm.enabled: false# =================================== Kibana ===================================
setup.kibana:# ---------------------------- Elasticsearch Output ----------------------------
output.elasticsearch:# Array of hosts to connect to.hosts: ["10.8.10.12:9200"]index: "logdata-%{+yyyy.MM.dd}"indices:- index: "logdata-%{[fields.source]}-%{+yyyy.MM.dd}"when.equals: fields: source: "common"- index: "logdata-%{[fields.source]}-%{+yyyy.MM.dd}"when.equals:fields:source: "exception"pipelines:- pipeline: logDataPipelinewhen.equals:fields.type: logDataPipeline# ================================= Processors =================================
processors:- add_host_metadata:when.not.contains.tags: forwarded- add_cloud_metadata: ~- add_docker_metadata: ~- add_kubernetes_metadata: ~

创建自定义字段 FileBeat fields.yml

# 我们自定义的
- key: rbttitle: rbtdescription: rbt log data fields fields:- name: logdatatype: keyword- name: actionOrFunctiontype: keyword- name: businessTypetype: keyword- name: callMethodtype: keyword- name: requestMethodtype: keyword- name: callLinktype: keyword- name: loginUserIptype: keyword- name: userNametype: keyword- name: userIdtype: keyword- name: paramOrInputDatatype: keyword- name: resultOrOutputDatatype: keyword- name: exceptionInfotype: keyword- name: systemEnvtype: keyword- name: statustype: long- name: fullLinkIdtype: keyword- name: subFullLinkIdtype: keyword- name: currentTimeMillisecondtype: long- name: detailtype: keyword- name: othertype: keyword- name: errorDatatype: keyword- name: errorDataSourcetype: keyword- name: errorDataDetailtype: keyword- name: logTimetype: keyword- name: processTimetype: long- name: orgCodetype: keyword- name: orgNametype: keyword- name: exceptionDetailInfotype: keyword- name: insertTimetype: date# FileBeat自带的
- key: ecstitle: ECSdescription: ECS Fields.fields:- name: '@timestamp'level: corerequired: truetype: datedescription: 'Date/time when the event originated.This is the date/time extracted from the event, typically representing whenthe event was generated by the source.If the event source has no original timestamp, this value is typically populatedby the first time the event was received by the pipeline.Required field for all events.'example: '2016-05-23T08:05:34.853Z'

执行 FileBeat

[root@test13 filebeat-7.9.3-linux-x86_64]# ls
data        fields.yml.bak  filebeat.reference.yml  filebeat.yml.bak  LICENSE.txt  modules.d   README.md
fields.yml  filebeat        filebeat.yml            kibana            module       NOTICE.txt  s.log
[root@test13 filebeat-7.9.3-linux-x86_64]# ./filebeat -e 

filebeat 启动命令说明

-c 指定配置文件
-d "*" 报错时候,查看具体的错误原因。

测试

新增数据到 vim /var/log2/test.log

2021-07-01 20:07:25 [XNIO-1 task-2] INFO  fileBeatLogData - 查询用户|4|com.internet.operator.controller..list()|GET|http://127.0.0.1:8080/list|127.0.0.1|jast110|9a2e232170744efda8c526d67f4f5405|userAcco909571P&installedLocation=&pageNum=10&pageSize=10&superQuery=1|{"code":200,"msg":"查询成功","rows":[],"took":2,"total":1}|||0|||1625141245843||||||2021-07-01 20:07:25|142|91110108769392234H|测试111|X

查询结果发现日志已经进入到ES
在这里插入图片描述

个人公众号(大数据学习交流): hadoopwiki

Pipeline 配置详解

1. 根据日志数据指定索引 _id

每个文档都会有一些元数据字段信息(metadata filed),比如_id,_index,_type 等,我们在 processors 中也可以直接访问这些信息的,比如下面的例子:

{"set": {"field": "_id","value": "{{logdata.6}}"}
}

FileBeat 配置详解

注意:首次创建的时候FileBeat会在ElasticSearch设置我们再FileBeat配置的_template索引模板,后续重启服务即便配置改了都不会更新该模板,比如下面的分片副本数量,首次启动后,该配置会写入索引模板中,后续修改不起作用。需要重新配置修改,需要删除filebeat目录下的data目录。

1.设置Filebeat保存到ElasticSearch索引副本、分片数量

修改 filebeat.yml 文件中下面参数

setup.template.settings:# 索引默认分片数index.number_of_shards: 1# 索引默认副本数index.number_of_replicas: 1

异常处理

提示 ERROR instance/beat.go:802 Exiting: error initializing processors:

异常内容如下

2022-01-20T14:39:22.441+0800    ERROR   instance/beat.go:802    Exiting: error initializing processors: Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?
Exiting: error initializing processors: Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?

解决方法
注释掉filebeat.yml文件中的add_docker_metadataadd_kubernetes_metadata

# ================================= Processors =================================
processors:- add_host_metadata:when.not.contains.tags: forwarded- add_cloud_metadata: ~
#  - add_docker_metadata: ~
#  - add_kubernetes_metadata: ~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/509647.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

网络编程中的关键问题总结

内容目录: 连接建立连接断开消息到达发送消息消息发送完毕其它问题参考 网络编程中的关键问题总结 总结下网络编程中关键的细节问题,包含连接建立、连接断开、消息到达、发送消息等等; 连接建立 包括服务端接受 (accept) 新连接和客户端成功发…

List实现类性能和特点分析

面向接口编程: 接口类型 变量 new 实现类(); List list new ArrayList(); List实现类特点和性能分析: 三者共同的特点(共同遵循的规范): 1):允许元素重复. 2):记录元素的先后添加顺序. Vector类: 底层才有数组结构算法,方法都使用了synchronized修饰,线程安全,但是性能…

Java集合框架-重构设计

根据Vector类,ArrayList类,LinkedList类所有具有的存储特点以及拥有的方法入手,发现共性就往上抽取. 共同的特点: 1):允许元素重复的. 2):会记录先后添加的顺序. 共同的方法: 如下图. 根据他们的特点,我就可以指定规范: 遵循该规范的实现类,无论底层算法如何,都必须保证允…

Canal Mysql binlog 同步至 Hbase ES

文章目录一、Canal介绍工作原理canal 工作原理二、下载三、安装使用Mysql准备canal 安装解压缩 canal-deployer配置修改启动查看server日志查看instance日志服务停止canal-client使用Canal Adapter数据同步Hbase数据同步ElasticSearch一、Canal介绍 早期阿里巴巴因为杭州和美国…

java中集合的迭代操作

集合的迭代操作: 把集合做的元素一个一个的遍历取出来. 迭代器对象: Iterator: 迭代器对象,只能从上往下迭代. boolean hasNext(); 判断当前指针后是否有下一个元素 Object next():获取指针的下一个元素,并且移动指针. ListIterator: 是Iterator接口的子接口,支持双向迭代…

用C++11的std::async代替线程的创建

转自:http://www.cnblogs.com/qicosmos/p/3534211.html c11中增加了线程,使得我们可以非常方便的创建线程,它的基本用法是这样的: void f(int n); std::thread t(f, n 1); t.join(); 但是线程毕竟是属于比较低层次的东西&#xf…

HashSet类

Set是Collection子接口,模拟了数学上的集的概念。 Set集合存储特点: 1):不允许元素重复. 2):不会记录元素的先后添加顺序. Set只包含从Collection继承的方法,不过Set无法记住添加的顺序,不允许包含重复的元素。当试图添加两个相同元素进Se…

Canal Mysql同步至ES/Hbase只有新增时生效,修改删除不生效

问题描述 新增Mysql数据时,ES、Hbase数据会同步成功;当删除Mysql数据,或者修改Mysql数据时同步ES、Hbase数据无变化(PS:修改和删除加上LIMIT xxx 就可以成功。) 问题分析 通过查看日志发现新增和删除记录的日志区别:新增data有…

LinkedHashSet类

List接口: 允许元素重复,记录先后添加顺序. Set接口: 不允许元素重复,不记录先后添加顺序. 需求: 不允许元素重复,但是需要保证先后添加的顺序. LinkedHashSet:底层才有哈希表和链表算法. 哈希表:来保证唯一性,.此时就是HashSet,在哈希表中元素没有先后顺序. 链表: 来记录…

Canal Mysql binlog 同步至 ElasticSearch 详细介绍

文章目录数据同步ElasticSearch单表基本配置适配器映射文件详细介绍(单表、多表映射介绍)单表映射索引示例sql单表映射索引示例sql带函数或运算操作多表映射(一对一, 多对一)索引示例sql多表映射(一对多)索引示例sql其它类型的sql示例注意事项本文详细介…

基于C++11的线程池

背景 在传统的收到任务即创建线程的情况下,我们每收到一个任务,就创建一个线程,执行任务,销毁线程, 我们把这三个过程所用的时间分别记做T1,T2,T3 任务本身所用的时间仅占T2/(T1T2T3),这在任务本身所用时间很短的情况下…

集合的工具类

集合操作的工具类: 1):Arrays类: 2):Collections类. Arrays类: 在Collection接口中有一个方法叫toArray把集合转换为Object数组. 把集合转换为数组: Object[] arr 集合对象.toArray(); 数组也可以转换为集合(List集合): public static List asList(T… a) 等价于public …

Docker入门到精通开发指南(一文搞懂)

文章目录安装官方安装文档具体安装步骤1.卸载之前的版本(如果之前未用过忽略该步骤)2.安装相关依赖3.设置docker镜像4.安装docker安装latest版本指定版本安装5.启动docker6.查看docker版本7.运行一个docker hello world8.卸载docker9.配置阿里云镜像加速地址docker常用命令dock…

如何向Maven中央仓库提交自己的Jar包(发布自己的Jar包到中央仓库)

文章目录注册账号GPG 安装安装生成密钥上传公钥Maven配置上传到Maven仓库修改项目的配置,填写基本信息执行编译命令登录网站配置发布项目中应用遇到的问题解决方法本文将介绍如何将自己的jar包发布至公共的中央仓库,通过maven方式进行引用 注册账号 注册…

List和Set以及Map的选用

选用哪一种容器取决于每一种容器的存储特点以及当前业务的需求: List: 单一元素集合. 允许元素重复/记录元素的添加顺序. Set:单一元素集合. 不允许元素重复/不记录元素的添加顺序. 既要不重复,又要保证先后顺序:LinkedHashSet. Map: 双元素集合. 如果存储数据的时候,还得…

Map集合类

映射的数学解释: 设A、B是两个非空集合,如果存在一个法则f,使得对A中的每个元素a,按法则f,在B中有唯一确定的元素b与之对应,则称f为从A到B的映射,记作f:A→B。 映射关系(两个集合):A集合和B集…

Socket select模型

Windows socket select模型开发。 套接字select模型是一种比较常用的IO模型。利用该模型可以使Windows socket应用程序可以同时管理多个套接字。 使用select模型,可以使当执行操作的套接字满足可读可写条件时,给应用程序发送通知。收到这个通知后&#x…

Set实现类性能对比

Set接口的实现类: 共同的特点: 1):都不允许元素重复. 2):都不是线程安全的类. 解决方案:Set s Collections.synchronizedSet(Set对象); HashSet: 不保证元素的先后添加顺序. 底层才有的是哈希表算法,查询效率极高. 判断两个对象是否相等的规则: 1):equals比较为true. …

HugeGraph Server/Hubble安装使用

文章目录HugeGraph Server1 概述2 依赖2.1 安装JDK-1.83 部署3.1 下载tar包4 安装启动4.1 解压4.2 配置Hbase5 访问Server5.1 服务启动状态校验6 停止Server7 多图配置HugeGraph-Hubble 基于Web的可视化图形界面1.概述2.安装3 使用3.1创建图HugeGraph Server 1 概述 HugeGrap…