Logstash笔记

目录​​​​​​​

一、简介

二、单个输入和输出插件

三、多个输入和输出插件

四、pipeline结构

五、队列和数据弹性

六、内存队列

七、持久化队列

八、死信队列 (DLQ)

九、输入插件

1)、beats

2)、dead_letter_queue

 3)、elasticsearch 

 4)、file

5)、redis

十、输出插件

1)、elasticsearch

2)、file

 3)、kafka

4)、redis

十一、过滤插件

1)、age

2)、bytes

3)、date

4)、grok


一、简介

Logstash 是一个具有实时管道功能的开源数据收集引擎。 Logstash 可以动态地统一来自不同来源的数据,并将数据规范化到您选择的目的地。

Logstash 管道有两个必需元素(输入和输出)以及一个可选元素(过滤器)。输入插件使用来自源的数据,过滤器插件根据您的指定修改数据,输出插件将数据写入目标。
例如:

image.png


最基本的 Logstash 管道:

cd logstash-8.12.2
# 示例中的管道从标准输入 stdin 获取输入,并以结构化格式将该输入移动到标准输出 stdout。
bin/logstash -e 'input { stdin { } } output { stdout {} }'

二、单个输入和输出插件

在创建 Logstash 管道之前,您将配置 Filebeat 以将日志行发送到 Logstash。 Filebeat 客户端是一个轻量级、资源友好型工具,它从服务器上的文件收集日志并将这些日志转发到 Logstash 实例进行处理。 Filebeat 专为可靠性和低延迟而设计。 Filebeat 在主机上的资源占用很少,Beats 输入插件最大限度地减少了 Logstash 实例的资源需求。

安装Filebeat后,需要对其进行配置。打开位于 Filebeat 安装目录中的 filebeat.yml 文件:

#输出logstash中filebeat.inputs:
- type: logpaths:- /path/to/file/logstash-tutorial.log    # Filebeat 处理的一个或多个文件的绝对路径
output.logstash:hosts: ["localhost:5044"]   #输出logstash中

创建一个 Logstash 配置管道,该管道使用 Beats 输入插件从 Beats 接收事件,first-pipeline.conf 的文件:

input {beats {port => "5044"   # 使用 Filebeat 将 Apache Web 日志作为输入}
}filter {   # 解析这些日志以从日志中创建特定的命名字段grok {match => { "message" => "%{COMBINEDAPACHELOG}"}}geoip {source => "clientip"}
}
output {elasticsearch {hosts => [ "localhost:9200" ]   # 将解析后的数据写入 Elasticsearch 集群}
}

三、多个输入和输出插件

创建一个 Logstash 管道,该管道从 Twitter 源和 Filebeat 客户端获取输入,然后将信息发送到 Elasticsearch 集群以及将信息直接写入文件。

input {twitter {consumer_key => "enter_your_consumer_key_here"consumer_secret => "enter_your_secret_here"keywords => ["cloud"]oauth_token => "enter_your_access_token_here"oauth_token_secret => "enter_your_access_token_secret_here"}beats {port => "5044"}
}
output {elasticsearch {hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"]}file {path => "/path/to/target/file"}
}

四、pipeline结构

input {...
}filter {...
}output {...
}

每个部分都包含一个或多个插件的配置选项。如果指定多个过滤器,它们将按照它们在配置文件中出现的顺序应用。如果指定多个输出,事件将按照它们在配置文件中出现的顺序按顺序发送到每个目标。

五、队列和数据弹性

默认情况下,Logstash 在管道阶段(输入 → 管道工作线程)之间使用内存中的有界队列来缓冲事件如果 Logstash 遇到临时机器故障,内存队列的内容将会丢失。(临时机器故障是指 Logstash 或其主机异常终止但能够重新启动的情况。)为了防止数据丢失并确保事件在管道中不间断地流动,Logstash 提供了数据弹性功能。

  • 持久队列 (PQ) 通过将事件存储在磁盘上的内部队列中来防止数据丢失。
  • 死信队列 (DLQ) 为 Logstash 无法处理的事件提供磁盘存储,以便您可以评估它们。您可以使用 dead_letter_queue 输入插件轻松地重新处理死信队列中的事件。

默认情况下,这些弹性功能处于禁用状态。要打开这些功能,您必须在 Logstash 设置文件中显式启用它们。

六、内存队列

内存队列大小不是直接配置的。相反,这取决于您如何调整 Logstash。

其上限由 pipeline.workers (默认值:CPU 数量)乘以 pipeline.batch.size (默认值:125)事件定义。该值称为“飞行计数”,确定每个内存队列中可以保存的最大事件数。

当队列已满时,Logstash 对输入施加反压,以阻止数据流入 Logstash。

七、持久化队列

Logstash 持久队列通过将运行中的消息队列存储到磁盘来帮助防止异常终止期间的数据丢失。

  • 有助于防止正常关闭期间和 Logstash 异常终止期间消息丢失。如果在事件正在进行时 Logstash 重新启动,Logstash 会尝试传送存储在持久队列中的消息,直到传送至少成功一次。
  • 可以吸收突发事件,而不需要 Redis 或 Apache Kafka 等外部缓冲机制。

默认情况下禁用持久队列。

持久队列不能解决以下问题:

  • 不使用请求-响应协议的输入插件无法防止数据丢失。 Tcp、udp、zeromq 推+拉和许多其他输入没有向发送方确认收到的机制。 (beats和http等插件确实具有确认功能,因此受到该队列的良好保护。)
  • 如果在提交检查点文件之前发生异常关闭,则可能会丢失数据。
  • 持久队列不处理永久性机器故障,例如磁盘损坏、磁盘故障和机器丢失。保留到磁盘的数据不会被复制。

八、死信队列 (DLQ)

死信队列(DLQ)被设计为临时写入无法处理的事件的地方。 DLQ 使您能够灵活地调查有问题的事件,而不会阻塞管道或丢失事件。可以使用 dead_letter_queue 输入插件处理来自 DLQ 的事件。

默认情况下,当 Logstash 遇到由于数据包含映射错误或其他问题而无法处理的事件时,Logstash 管道会挂起或删除不成功的事件。为了防止这种情况下的数据丢失,您可以配置 Logstash 将不成功的事件写入死信队列而不是丢弃它们。

默认情况下禁用死信队列。要启用死信队列,请在logstash.yml设置文件中设置dead_letter_queue_enable选项:

dead_letter_queue.enable: true

当您准备好处理死信队列中的事件时,您可以创建一个使用 dead_letter_queue 输入插件从死信队列中读取事件的管道。

input {dead_letter_queue {path => "/path/to/data/dead_letter_queue"   # 包含死信队列的顶级目录的路径commit_offsets => true   # 当 true 时,保存偏移量。当管道重新启动时,它将继续从上次停止的位置读取,而不是重新处理队列中的所有项目pipeline_id => "main"   # 写入死信队列的管道的 ID。默认为“main”clean_consumed => true   # 删除已完全消耗的段,从而在处理时释放空间start_timestamp => "2017-06-06T23:40:37"  # 使用 start_timestamp 选项在队列中的特定点开始处理事件}
}output {stdout {codec => rubydebug { metadata => true }   # 将事件(包括元数据)写入标准输出}
}

九、输入插件

1)、beats

在端口 5044 上侦听传入的 Beats 连接并为 Elasticsearch 建立索引:

input {beats {port => 5044}
}output {elasticsearch {hosts => ["http://localhost:9200"]index => "%{[@metadata][beat]}-%{[@metadata][version]}"  # %{[@metadata][beat]} 将索引名称的第一部分设置为元数据字段的值,%{[@metadata][version]} 将第二部分设置为 Beat 版本。例如:metricbeat-6.1.6。}
}

2)、dead_letter_queue

用于从 Logstash 的死信队列中读取事件: 

input {dead_letter_queue {path => "/var/logstash/data/dead_letter_queue"start_timestamp => "2017-04-04T23:40:37"}
}

 3)、elasticsearch 

input {# Read all documents from Elasticsearch matching the given queryelasticsearch {hosts => "localhost"query => '{ "query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ] }'}}

 4)、file

input {file {path => "/path/log/*.log"}
}

5)、redis

input {redis {# Redis的key的类型。值可以是以下任意值:list、channel、pattern_channeldata_type => "list"# Redis 数据库编号db => 5# Redis的数据库地址,默认为127.0.0.1host => "10.0.0.10"# Redis的端口号port => 6379# Redis的认证密码password => "admin"# Redis 列表或通道的名称key => "logstash-input-redis"}
}

十、输出插件

输出插件将事件数据发送到特定目的地。输出是事件管道的最后阶段。

1)、elasticsearch

使用 mutate 过滤器和条件来添加 [@metadata] 字段来设置每个事件的目标索引:

filter {if [log_type] in [ "test", "staging" ] {mutate { add_field => { "[@metadata][target_index]" => "test-%{+YYYY.MM}" } }} else if [log_type] == "production" {mutate { add_field => { "[@metadata][target_index]" => "prod-%{+YYYY.MM.dd}" } }} else {mutate { add_field => { "[@metadata][target_index]" => "unknown-%{+YYYY}" } }}}output {elasticsearch {index => "%{[@metadata][target_index]}"}}

2)、file

output {file {path => ...codec => line { format => "custom format: %{message}"}}
}

 3)、kafka

output {kafka {codec => jsontopic_id => "mytopic"   # 生成消息的主题}}

4)、redis

output {redis {# 指定写入数据的类型data_type => "list"# Redis 数据库编号db => 5# Redis主机地址host => "10.0.0.10"# Redis的端口号port => 6379# Redis的认证密码password => "admin"# Redis写入的key值key => "logstash-input-redis"}
}

十一、过滤插件

过滤器插件对事件执行中间处理。通常根据事件的特征有条件地应用过滤器。

1)、age

用于计算事件年龄的简单过滤器。
此过滤器通过从当前时间戳减去事件时间戳来计算事件的年龄。您可以将此插件与删除过滤器插件一起使用来删除早于某个阈值的 Logstash 事件。 

filter {age {add_field => {   # 一次添加多个字段"foo_%{somefield}" => "Hello world, from %{host}""new_field" => "new_static_value"}}if [@metadata][age] > 86400 {drop {}}
}

2)、bytes

将计算机存储大小的字符串表示形式(例如“123 MB”或“5.6gb”)解析为其以字节为单位的数值。

filter {bytes {source => "my_bytes_string_field"   # 包含存储大小的源字段的名称target => "my_bytes_numeric_field"  # 将包含存储大小(以字节为单位)的目标字段的名称}}

3)、date

日期过滤器用于解析字段中的日期,然后使用该日期或时间戳作为事件的 Logstash 时间戳。 

 filter {date {match => [ "logdate", "MMM dd yyyy HH:mm:ss" ]}}

4)、grok

解析任意文本并对其进行结构化。Grok 是将非结构化日志数据解析为结构化且可查询的数据的好方法。

虚构的 http 请求日志:

55.3.244.1 GET /index.html 15824 0.043
filter {grok {match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }}}

 在 grok 过滤器之后,事件中将有一些额外的字段:

client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/16935.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

字符串和字符串函数(1)

前言: 字符串在C语言中比较特别,没有单另的字符串类型,想要初始化字符串必须用字符变量的数组初始化,但是在C语言标准库函数中提供了大量能对字符串进行修改的函数,比如说可以实现字符串的的拷贝,字符串的追…

经常碰到的20个等待事件

经常碰到的20个等待事件 oracle等待事件简介 DBA团队维护的部分应用运行在oracle数据库平台,为及时了解数据库的运行情况,需要建立涵盖各个维度的监控体系,包括实例状态、空间使用率、ORA错误等数十项监控指标。这其中有一个有效判断数据库…

Nodejs+Websocket+uniapp完成聊天

前言 最近想做一个聊天,但是网上的很多都是不能实现的,要么就是缺少代码片段很难实现websocket的链接,更别说聊天了。自己研究了一番之后实现了这个功能。值得注意的是,我想在小程序中使用socket.io,不好使&#xff0…

从0.1nm到1mm:显微测量仪在抛光至粗糙表面测量中的技术突破

显微测量仪是纳米级精度的表面粗糙度测量技术。它利用光学、电子或机械原理对微小尺寸或表面特征进行测量,能够提供纳米级甚至更高级别的测量精度,这对于许多科学和工业应用至关重要。 在抛光至粗糙表面测量中,显微测量仪器具有从0.1nm到1mm…

java:程序包javax. servLet不存在

一.原因 1.项目Tomcat 服务器依赖未导入 2.项目的 SDK 版本选择错误 二.解决方法 方案一: 1.选择项目结构选项 2.导入Tomcat依赖 把tomcat里面的【jsp-api.jar】和【servlet-api.jar】这两个包导入 方案二: 1.选择项目结构选项 2.选择自己的jdk版本…

Golang | Leetcode Golang题解之第108题将有序数组转换为二叉搜索树

题目: 题解: func sortedArrayToBST(nums []int) *TreeNode {rand.Seed(time.Now().UnixNano())return helper(nums, 0, len(nums) - 1) }func helper(nums []int, left, right int) *TreeNode {if left > right {return nil}// 选择任意一个中间位置…

【Python性能优化】取最值的差异

取最值的差异 测试Windows 测试结果Linux 测试结果 测试 测试内容:从一组 x, y, z 坐标值中获得每个维度(x、y、z)的值域范围。此处不考虑将数据临时存放到内存,再整组获取值域的操作(因为对单文件这么做问题不大&…

PS系统教学01

在前面几节内容基本介绍了PS的基本作用,简单的对PS中的某些基础功能进行介绍应用。 接下来我们进行系统的分享。 本次分享内容 基础的视图操作 接下来我们是对于PS工作区域的每个图标工具进行详细的分享 抓手工具缩放工具 这个图标是将工具栏由一列变成两列 一…

SpringBoot——整合SLF4j进行日志记录

目录 日志 项目总结 新建一个SpringBoot项目 pom.xml application.properties项目配置文件 logger.xml日志配置文件 TestController控制器 SpringbootSlf4jApplication启动类 启动项目 生成logger.log日志文件 日志 在开发中,我们经常使用 System.out.prin…

物联网六大核心技术——青创智通

工业物联网解决方案-工业IOT-青创智通 物联网六大核心技术,是构建万物互联的基础,它们相互协作,共同实现物联网的广泛应用和深远影响。这六大技术分别是感知技术、网络通信技术、智能识别技术、计算技术、平台技术和安全技术,它们…

aws lakeformation注册s3位置的原因

参考资料 lakeformation底层数据的访问逻辑 向lakeformation注册s3位置的目的是让lakeformation控制对AWS S3 位置底层数据的访问(以下简称LF) 注册s3位置后可以进行两种授权 数据访问授权(SELECT、INSERT 和 DELETE) 数据位置…

不含一阶导数项的线性二阶微分方程的通解

假设这里有一个线性二阶微分等式,形式如下: (1) 其中是连续的,是在实闭区间是连续的,如果有人倾向于推广,在相对假弱的假设下,这个结果能够被发现。如果是下列其次线性方程的任意两个线性无关的…

小度推出全球首款基于文心大模型的学习机Z30,仅售价6699元

5月27日,小度科技召开新品发布会,全球首款基于文心大模型的学习机——小度学习机Z30重磅发布。 据「TMT星球」了解,该产品基于文心大模型,重新定义了“AI老师”的能力边界,不仅是一款能为孩子提供全面、有效学习辅导的…

报修新选择:一款软件搞定所有维修问题

数字化、智能化时代发展迅速,各类便捷、智能化软件应用已经深入到我们生活和工作的方方面面。尤其是在企业或学校的设备管理中,报修维修工作一直是一个重要环节。传统的报修方式,如电话报修、填写纸质报修单等,已经无法满足现代高…

Superset二次开发之柱状图自定义初始化时Data Zoom数据缩放值

Superset项目柱状图来自Echarts 首先Echarts实现柱状图数据缩放初始化默认值的效果 核心代码 设置Datazoom 的start 和end myChart.showLoading(); $.get(ROOT_PATH /data/asset/data/obama_budget_proposal_2012.list.json,function (obama_budget_2012) {myChart.hideLoa…

Python 文件操作指南:使用 open 和 with open 实现高效读写

🍀 前言 博客地址: CSDN:https://blog.csdn.net/powerbiubiu 👋 简介 本系列文章主要分享文件操作,了解如何使用 Python 进行文件的读写操作,介绍常见文件格式的读取和写入方法,包括TXT、 CS…

aws lakeformation工作流程和权限管理逻辑

lakeformation在IAM权限模型之外提供独立的更细粒度的权限,控制数据湖数据的访问 能够提供列、行和单元格级别的精细控制 lakeformation的目的是要取代s3和iam策略,主要功能为 数据摄入,LF可以将不同类型的数据统一管理安全管理&#xff0…

小程序内使用路由

一:使用组件 1)创建组件 2)在需要的页面的json/app.json可实现局部使用和全局使用 在局部的话,对象内第一层,window配置也是第一层,而在全局配置也是在第一层,window在window对象内.第二层.内部执行遍历不一样. 3)页面使用 上述所写可实现在页面内使用组件.效果是页面内可以将…

十四天学会Vue——Vue核心(理论+实战)中篇(第二天)

声明:是接着上篇讲的哦,感兴趣可以去看一看~ 这里一些代码就不写了,为了缩减代码量,大家知道就可以了: Vue.config.productionTip false //阻止 vue 在启动时生成生产提示。热身小tips,可以安装这个插件&…

阿里通义千问大模型AI接入火车头自动生成内容插件

插件特点: 可以根据采集的关键词,自动生成文章可自定义提示词 也可以分析标题重写一个标题2个提问标签 如有需要可自由增加对话标签自己可以设置TXT关键词导入,自动采集生成 安装说明: 1.需要python环境 ,具体可以…