ES8生产实践——日志清洗过滤(vector方案)

前言

什么是vector

以下描述摘自官方文档:https://vector.dev/docs/about/what-is-vector/

Vector 是一个高性能的可观测性数据管道,可帮助企业控制其可观测性数据。收集、转换和路由所有日志、度量指标和跟踪数据,并将其提供给今天需要的任何供应商和明天可能需要的任何其他供应商。Vector 可在您需要的地方,而不是在供应商最方便的地方,大幅降低成本、丰富新颖的数据并确保数据安全。开放源代码,速度比其他任何替代方案快 10 倍。

简单来说由于logstash使用java语言开发,在处理海量数据时存在性能低下,占用资源过高的问题。而vector使用Rust语言编写,除了使用极少的资源实现logstash数据处理能力外,还具备配置文件简单、处理函数强大、智能均衡kafka分区消费、自适应并发请求等特色功能。

vector架构图

与logstash管道处理类似,vector主要包含数据输入、数据处理、数据输出三部分。
image.png

vector优势

  • 超级快速可靠:Vector采用Rust构建,速度极快,内存效率高,旨在处理最苛刻的工作负载
  • 端到端:Vector 致力于成为从 A 到 B 获取可观测性数据所需的唯一工具,并作为守护程序、边车或聚合器进行部署
  • 统一:Vector 支持日志和指标,使您可以轻松收集和处理所有可观测性数据
  • 供应商中立:Vector 不偏向任何特定的供应商平台,并以您的最佳利益为出发点,培育公平、开放的生态系统。免锁定且面向未来
  • 可编程转换:Vector 的高度可配置转换为您提供可编程运行时的全部功能。无限制地处理复杂的用例

对比测试

性能对比

下图是 Vector 与其它日志收集器的性能测试结果对比,可以看到,Vector 的各项性能指标都优于 Logstash,综合性能也不错,加上其丰富的功能,完全可以满足我们的日志处理需求。

TestVectorFilebeatFluentBitFluentDLogstashSplunkUFSplunkHF
TCP to Blackhole86mib/sn/a64.4mib/s27.7mib/s40.6mib/sn/an/a
File to TCP76.7mib/s7.8mib/s35mib/s26.1mib/s3.1mib/s40.1mib/s39mib/s
Regex Parsing13.2mib/sn/a20.5mib/s2.6mib/s4.6mib/sn/a7.8mib/s
TCP to HTTP26.7mib/sn/a19.6mib/s<1mib/s2.7mib/sn/an/a
TCP to TCP69.9mib/s5mib/s67.1mib/s3.9mib/s10mib/s70.4mib/s7.6mib/s

可靠性对比

TestVectorFilebeatFluentBitFluentDLogstashSplunk UFSplunk HF
Disk Buffer Persistence
Disk Buffer Persistence
File Rotate (copytruncate)
File Truncation
Process (SIGHUP)
JSON (wrapped)

功能性对比

VectorBeatsFluentbitFluentdLogstashSplunk UFSplunk HFTelegraf
End-to-end
Agent
Aggregator
Unified
Logs
Metrics
Open
Open-source
Vendor-neutral
Reliability
Memory-safe
Delivery guarantees
Multi-core

快速上手

安装部署

官方为我们提供了安装包、docker等多种安装方式,下载地址:https://vector.dev/download/,此处以rpm包部署为例。

[root@tiaoban ~]# wget https://packages.timber.io/vector/0.34.0/vector-0.34.0-1.x86_64.rpm
[root@tiaoban ~]# rpm -ivh vector-0.34.0-1.x86_64.rpm
[root@tiaoban ~]# systemctl start vector --now

配置测试

与logstash类似,在调试数据处理规则时,通常会从文件中读取数据,经过一系列处理后最后输出至控制台。其作用是读取 /var/log/messages日志文件,然后把 syslog 格式的日志转换成 json 格式,最后输出到标准输出:

[root@tiaoban ~]# cd /etc/vector/
[root@tiaoban vector]# ls
examples  vector.yaml  vector.yaml.back
[root@tiaoban vector]# cat vector.yaml
sources:in:type: "stdin"sinks:print:type: "console"inputs: ["in"]encoding:codec: "json"
[root@tiaoban vector]# vector -c vector.yaml
2023-11-12T13:40:06.995872Z  INFO vector::app: Log level is enabled. level="vector=info,codec=info,vrl=info,file_source=info,tower_limit=info,rdkafka=info,buffers=info,lapin=info,kube=info"
2023-11-12T13:40:06.996328Z  INFO vector::app: Loading configs. paths=["vector.yaml"]
2023-11-12T13:40:06.998137Z  INFO vector::topology::running: Running healthchecks.
2023-11-12T13:40:06.998206Z  INFO vector: Vector has started. debug="false" version="0.34.0" arch="x86_64" revision="c909b66 2023-11-07 15:07:26.748571656"
2023-11-12T13:40:06.998219Z  INFO vector::app: API is disabled, enable by setting `api.enabled` to `true` and use commands like `vector top`.
2023-11-12T13:40:06.998590Z  INFO vector::topology::builder: Healthcheck passed.
2023-11-12T13:40:06.998766Z  INFO vector::sources::file_descriptors: Capturing stdin.
hello vector
{"host":"tiaoban","message":"hello vector","source_type":"stdin","timestamp":"2023-11-12T13:40:13.368669601Z"}

当控制台开始打印日志,就说明正常采集到了数据,而且转换成了 json 并打印到了控制台,实验成功。接下来详细介绍vector各个配置段具体内容。

多配置文件

当项目、环境、规则等十分庞杂时,推荐对配置文件根据需要进行拆分,而默认的 vector.toml 文件中可以写入一些全局配置。
修改systemctl 命令管理相关配置,让 Vector 读取指定目录下所有配置文件,而非默认的 vector.toml 文件。
将 Vector systemd 配置文件第 12 行由 ExecStart=/usr/bin/vector 修改为:ExecStart=/usr/bin/vector “–config-dir” “/etc/vector”:

# /usr/lib/systemd/system/vector.service
[Unit]
Description=Vector
Documentation=https://vector.dev
After=network-online.target
Requires=network-online.target[Service]
User=vector
Group=vector
ExecStartPre=/usr/bin/vector validate
ExecStart=/usr/bin/vector "--config-dir" "/etc/vector"
ExecReload=/usr/bin/vector validate
ExecReload=/bin/kill -HUP $MAINPID
Restart=always
AmbientCapabilities=CAP_NET_BIND_SERVICE
EnvironmentFile=-/etc/default/vector
# Since systemd 229, should be in [Unit] but in order to support systemd <229,
# it is also supported to have it here.
StartLimitInterval=10
StartLimitBurst=5
[Install]
WantedBy=multi-user.target

修改 vector 配置目录环境变量,将 VECTOR_CONFIG_DIR="/etc/vector/"追加到 /etc/default/vector 文件最后:

# /etc/default/vector
# This file can theoretically contain a bunch of environment variables
# for Vector.  See https://vector.dev/docs/setup/configuration/#environment-variables
# for details.
VECTOR_CONFIG_DIR="/etc/vector/"

重新加载 systemd 配置并重启 Vector:

systemctl daemon-reload
systemctl restart vector

配置文件详解

配置文件构成

根据架构图可知,Vector由 Sources、Transforms和Sinks三个部分构成,Vector作为一款管道处理工具,日志数据可以从多个源头(source)流入管道,比如 HTTP、Syslog、File、Kafka 等等,当日志数据流入管道后,就可以被进行一系列处理,处理的过程就是转换(Transform),比如增减日志中的一些字段,对日志进行重新格式化等操作,日志被处理成想要的样子后,就可以传输给接收器(Sink)处理,也就是日志最终流向何处,可以是 Elasticsearch、ClickHouse、AWS S3 等等,配置文件基本格式如下

sources:	# 源my_source_id:   # 数据源名称type: "***" # 数据源类型transforms: # 转换my_transform_id: # 转换名称type: remap # 转换类型inputs: # 转换操作的源- my_source_id

源(source)

Vector 提供了丰富的 Sources 供使用,常用的有控制台、模拟数据、File、http、Kafka等,并且支持的种类还在不断增加,详情参考官方文档https://vector.dev/docs/reference/configuration/sources/。

  • 控制台输入
sources:my_source_id:   # 数据源名称type: "stdin"
  • 模拟日志数据输入
sources:my_source_id:  # 数据源名称type: "demo_logs"format: "apache_common"  # 模拟数据类型count:  10 # 模拟数据条数
  • file输入示例:
sources:my_source_id:   # 数据源名称type: "file"include:			# 采集路径- /var/log/**/*.log
  • http输入示例:
sources:my_source_id:  # 数据源名称type: "http_server"address: "0.0.0.0:80" # 监听地址
  • kafka输入示例:
sources:my_source_id: # 数据源名称type: "kafka"bootstrap_servers: "10.14.22.123:9092,10.14.23.332:9092" # kafka地址group_id: "consumer-group-name"	# 消费组idtopics: # 消费主题,支持多个topic使用正则匹配- ^(prefix1|prefix2)-.+decoding: # 编码格式codec: "json"auto_offset_reset: "latest" # 消费偏移

转换(transforms)-VRL

在Vector传输数据时,可能涉及数据解析、过滤、新增等操作,跟Logstash组件类似,Vector 提供了诸多 Transforms插件来对日志进行处理。 更多transforms插件参考文档:https://vector.dev/docs/reference/configuration/transforms/
Vector推荐使用remap插件处理数据,它使用VRL语言对日志进行处理,它提供了非常丰富的函数,可以拿来即用。在线调试地址:https://playground.vrl.dev/,VRL语言快速入门参考文档:https://vector.dev/docs/reference/vrl/
接下来列举几个常用的转换案例

  • 字段增删改
# 配置文件
sources:my_source:   # 数据源名称type: "stdin"transforms:my_transform:type: remapinputs:    # 匹配输入源- my_sourcesource: |. = parse_json!(.message) # 解析json数据.@timestamp = now() # 新增字段del(.user) # 删除字段.age = del(.value) # 重命名字段sinks:my_print:type: "console"inputs: ["my_transform"] # 匹配过滤项encoding:codec: "json"
# 执行结果
{"hello":"world","user":"张三","value":18}
{"@timestamp":"2023-11-19T03:02:07.060420199Z","age":18,"hello":"world"}
  • 字段值操作
# 配置文件
sources:my_source:   # 数据源名称type: "stdin"transforms:my_transform:type: remapinputs:    # 匹配输入源- my_sourcesource: |. = parse_json!(.message) # 解析json数据.msg = downcase(string!(.msg)) # 转小写.user = replace(string!(.user), "三", "四") # 字符串替换.value = floor(float!(.value), precision: 2) # 小数保留指定长度sinks:my_print:type: "console"inputs: ["my_transform"] # 匹配过滤项encoding:codec: "json"
# 执行结果
{"msg":"Hello, World!","user":"张三","value":3.1415926}
{"msg":"hello, world!","user":"张四","value":3.14}
  • 正则解析
# 配置文件
sources:my_source:   # 数据源名称type: "stdin"transforms:my_transform:type: remapinputs:    # 匹配输入源- my_sourcesource: |. = parse_regex!(.message, r'^\[(?<logtime>[^\]]*)\] (?<name>[^ ]*) (?<title>[^ ]*) (?<id>\d*)$')sinks:my_print:type: "console"inputs: ["my_transform"] # 匹配过滤项encoding:codec: "json"
# 执行结果
[2023-11-11 12:00:00 +0800] alice engineer 1
{"id":"1","logtime":"2023-11-11 12:00:00 +0800","name":"alice","title":"engineer"}
  • 时间格式化
# 配置文件
sources:my_source:   # 数据源名称type: "stdin"transforms:my_transform:type: remapinputs:    # 匹配输入源- my_sourcesource: |. = parse_regex!(.message, r'^\[(?<logtime>[^\]]*)\] (?<name>[^ ]*) (?<title>[^ ]*) (?<id>\d*)$').logtime = parse_timestamp!((.logtime), format:"%Y-%m-%d %H:%M:%S %:z")
sinks:my_print:type: "console"inputs: ["my_transform"] # 匹配过滤项encoding:codec: "json"
# 执行结果
[2023-11-11 12:00:00 +0800] alice engineer 1
{"id":"1","logtime":"2023-11-11T04:00:00Z","name":"alice","title":"engineer"}
  • geoip解析
# 配置文件
sources:my_source:   # 数据源名称type: "stdin"transforms:my_transform:type: remapinputs:    # 匹配输入源- my_sourcesource: |. = parse_json!(.message) # 解析json数据.geoip = get_enrichment_table_record!("geoip_table",{"ip": .ip_address})sinks:my_print:type: "console"inputs: ["my_transform"] # 匹配过滤项encoding:codec: "json"enrichment_tables:geoip_table: # 指定geoip数据库文件path: "/root/GeoLite2-City.mmdb"type: geoip
# 执行结果
{"ip_address":"185.14.47.131"}
{"geoip":{"city_name":"Hong Kong","continent_code":"AS","country_code":"HK","country_name":"Hong Kong","latitude":22.2842,"longitude":114.1759,"metro_code":null,"postal_code":null,"region_code":"HCW","region_name":"Central and Western District","timezone":"Asia/Hong_Kong"},"ip_address":"185.14.47.131"}

转换(transforms)-Lua

如果 VRL 不能满足用户对日志的处理需求,Vector 也支持嵌入 Lua 语言对日志进行处理,但是这种方式要比 VRL 慢将近 60 %。具体内容可参考文档:https://vector.dev/docs/reference/configuration/transforms/lua/,此处不再做过多介绍,推荐优先使用VRL语言转换。

转换(transforms)-过滤

很多时候从数据源采集过来的数据我们并不是全部都需要,filter顾名思义便是用来解决这一问题的,例如删除debug等级的日志信息。

# 配置文件
sources:my_source:   # 数据源名称type: "stdin"transforms:transform_json:type: remapinputs:    # 匹配输入源- my_sourcesource: |. = parse_json!(.message) # 解析json数据transform_filter:type: filterinputs:- transform_jsoncondition: |.level != "debug"sinks:my_print:type: "console"inputs: ["transform_filter"] # 匹配过滤项encoding:codec: "json"
# 执行结果
{"level":"debug","msg":"hello"}
{"level":"waring","msg":"hello"}
{"level":"waring","msg":"hello"}

接收器(sinks)

接收器是事件的目的地,Vector同样提供了很多 Sinks 类型,其中有些和 Sources 是重合的,比如 Kafka、AWS S3 等,更多支持的接收器类型可参考文档:https://vector.dev/docs/reference/configuration/sinks/

  • 输出到控制台
sinks:print:type: "console"inputs: ["in"]encoding:codec: "json" # 输出为json格式,也可设置为text文件
  • 输出到文件
sinks:my_sink_id:type: fileinputs:- my-source-or-transform-idpath: /tmp/vector-%Y-%m-%d.logencoding:codec: "text"
  • 输出到kafka
sinks:my_sink_id:type: kafkainputs:- my-source-or-transform-idbootstrap_servers: 10.14.22.123:9092,10.14.23.332:9092topic: topic-1234
  • 输出到elasticsearch
sinks:my_es:type: elasticsearchinputs: ["my_transform"] # 匹配转换配置api_version: "v8"        # ES版本,非必填mode: "data_stream" # 数据流方式写入auth: # es认证信息 strategy: "basic"user: "elastic"password: "WbZN3xfa5M4uy+UcxJeH"data_stream: # 数据流名称配置type: "logs"dataset: "vector"namespace: "default"endpoints: ["https://192.168.10.50:9200"] # es连接地址tls: # tls证书配置verify_certificate: false # 跳过证书验证# ca_file: "XXXXX" # ca证书路径

写入es中的数据流信息如下所示:
image.png

全局配置

  • 数据目录

用于持久化 Vector 状态的目录,例如 作为磁盘缓冲区、文件检查点等功能。

data_dir: "/var/lib/vector"
  • API

vector为我们提供了常用的API接口,可以很方便的进行监控检查与状态信息获取。
首先打开 Vector 的 api 功能,在 vector.toml 配置文件中加入以下内容即可:

api:enabled: trueaddress: "0.0.0.0:8686"

重启 Vector,获取 Vector 的健康状态:

$ curl localhost:8686/health
{"ok":true}

开启api后,我们还可以通过命令行vector top命令获取各个任务的性能信息
image.png

监控

指标

  • prometheus_remote_write

将 Vector 内部指标 Sink 到 Prometheus,据此建立更为详细的 Dashboard 和告警。
在 vector.toml 配置文件中加入以下内容即可,vector 内置的指标通过远程写入的方式写入指定的 Prometheus中。

sources:vector_metrics:type: internal_metrics
sinks:prometheus:type:- prometheus_remote_writeendpoint:- https://<prometheus_ip_address>:8087/inputs:- vector_metrics
  • prometheus_exporter

除了远程写入外,vector也支持通过exporter方式保留指标数据供Prometheus抓取,配置文件如下

sources:metrics:type: internal_metricsnamespace: vectorscrape_interval_secs: 30sinks:prometheus:type: prometheus_exporterinputs:- metricsaddress: 0.0.0.0:9598default_namespace: service

然后在Prometheus的job中配置地址为http://IP:9598/metrics即可。

日志

将vector的运行日志写入本地文件或者elasticsearch中存储,以本地存储为例:

sources:logs:type: "internal_logs"sinks:files:type: fileinputs:- logspath: /tmp/vector-%Y-%m-%d.logencoding:codec: text

特色功能

vector自动均衡kafka消费

在之前使用logstash消费kafka数据时,需要根据topic数据量大小配置kafka partition数、Logstash副本数、每个logstash线程数,而这些数量只能根据性能监控图和数据量逐个调整至合适的大小。例如有6台logstash机器,其中5台机器专门用于消费数据量大的topic,其他机器消费小数据量的topic,经常存在logstash节点负载不均衡的问题。
使用vector后,我们只需要让所有机器使用相同的配置,借助Kafka的Consumer Group技术,不同配置文件通过同一个group_id即可一起消费所有的topic,vector在消费的过程中会自动均衡kafka消费速率。

自适应并发请求

在0.11.0版本后默认启用了自适应并发,这是一个智能、强大的功能,官方介绍https://vector.dev/blog/adaptive-request-concurrency/#rate-limiting-problem
在之前的版本中,为了保障数据正常写入下游Elasticsearch或Clickhouse时,需要进行速率限制。但限制速率值设定存在以下问题

  • 将限制设置得太高,从而使服务不堪重负,从而损害系统可靠性。
  • 设置的限制太低,浪费资源。

为了解决这个问题,vector推出了自适应并发的功能,它会重点观察两件事:请求的往返时间 (RTT) 和 HTTP 响应代码(失败与成功),从而决策出一个最佳的速率!
image.png
在写入Elasticsearch或Clickhouse时,默认已将其设为启用, 不需要进一步的配置。

vector解析日志实践

调试解析配置

假设线上应用原始日志格式如下,接下来我们通过vector解析日志内容。

2023-07-23 09:35:18.987 | INFO     | __main__:debug_log:49 - {'access_status': 200, 'request_method': 'GET', 'request_uri': '/account/', 'request_length': 67, 'remote_address': '185.14.47.131', 'server_name': 'cu-36.cn', 'time_start': '2023-07-23T09:35:18.879+08:00', 'time_finish': '2023-07-23T09:35:19.638+08:00', 'http_user_agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.2999.0 Safari/537.36'}
  1. 修改vector配置文件,添加sources配置项,从控制台读取数据。并新增sinks配置项,输出到控制台,vector配置文件如下所示:
sources:my_source:   type: "stdin"sinks:my_print:type: "console"inputs: ["my_source"]encoding:codec: "json" 

观察控制台输出内容,已经将控制台输出的日志数据添加到了message字段中

2023-07-23 09:35:18.987 | INFO     | __main__:debug_log:49 - {'access_status': 200, 'request_method': 'GET', 'request_uri': '/account/', 'request_length': 67, 'remote_address': '185.14.47.131', 'server_name': 'cu-36.cn', 'time_start': '2023-07-23T09:35:18.879+08:00', 'time_finish': '2023-07-23T09:35:19.638+08:00', 'http_user_agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.2999.0 Safari/537.36'}
{"host":"huanbao","message":"2023-07-23 09:35:18.987 | INFO     | __main__:debug_log:49 - {'access_status': 200, 'request_method': 'GET', 'request_uri': '/account/', 'request_length': 67, 'remote_address': '185.14.47.131', 'server_name': 'cu-36.cn', 'time_start': '2023-07-23T09:35:18.879+08:00', 'time_finish': '2023-07-23T09:35:19.638+08:00', 'http_user_agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.2999.0 Safari/537.36'}","source_type":"stdin","timestamp":"2023-11-19T08:07:07.270947582Z"}
  1. 接下来调试VRL解析配置,推荐使用在线调试工具https://playground.vrl.dev/image.png

调试无误后,将VRL处理语句添加到vector配置中。

sources:my_source:   type: "stdin"transforms:my_transform:type: remapinputs:    # 匹配输入源- my_sourcesource: |. = parse_regex!(.message, r'^(?<logtime>[^|]+) \| (?<level>[A-Z]*) *\| __main__:(?<class>\D*:\d*) - (?<content>.*)$') # 正则提取logtime、level、class、content.content = replace(.content, "'", "\"") # 将content单引号替换为双引号.content = parse_json!(.content) # json解析content内容.access_status = (.content.access_status) # 将content中的子字段提取到根级.http_user_agent = (.content.http_user_agent).remote_address = (.content.remote_address).request_length = (.content.request_length).request_method = (.content.request_method).request_uri = (.content.request_uri).server_name = (.content.server_name).time_finish = (.content.time_finish).access_status = (.content.access_status).time_start = (.content.time_start)del(.content) # 删除content字段.logtime = parse_timestamp!((.logtime), format:"%Y-%m-%d %H:%M:%S.%3f") # 格式化时间字段.time_start = parse_timestamp!((.time_start), format:"%Y-%m-%dT%H:%M:%S.%3f%:z") .time_finish = parse_timestamp!((.time_finish), format:"%Y-%m-%dT%H:%M:%S.%3f%:z").level = downcase(.level) # 将level字段值转小写sinks:my_print:type: "console"inputs: ["my_transform"] # 匹配转换配置encoding:codec: "json" 

添加VRL解析规则后,控制台输出内容如下,已成功对原始数据完成解析处理

2023-07-23 09:35:18.987 | INFO     | __main__:debug_log:49 - {'access_status': 200, 'request_method': 'GET', 'request_uri': '/account/', 'request_length': 67, 'remote_address': '185.14.47.131', 'server_name': 'cu-36.cn', 'time_start': '2023-07-23T09:35:18.879+08:00', 'time_finish': '2023-07-23T09:35:19.638+08:00', 'http_user_agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.2999.0 Safari/537.36'}
{"access_status":200,"class":"debug_log:49","http_user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.2999.0 Safari/537.36","level":"info","logtime":"2023-07-23T01:35:18.987Z","remote_address":"185.14.47.131","request_length":67,"request_method":"GET","request_uri":"/account/","server_name":"cu-36.cn","time_finish":"2023-07-23T01:35:19.638Z","time_start":"2023-07-23T01:35:18.879Z"}
  1. 我们已经成功解析到了remote_address字段,接下来从geoip数据库中查询ip的地理位置信息。
sources:my_source:   type: "stdin"transforms:my_transform:type: remapinputs:    # 匹配输入源- my_sourcesource: |. = parse_regex!(.message, r'^(?<logtime>[^|]+) \| (?<level>[A-Z]*) *\| __main__:(?<class>\D*:\d*) - (?<content>.*)$') # 正则提取logtime、level、class、content.content = replace(.content, "'", "\"") # 将content单引号替换为双引号.content = parse_json!(.content) # json解析content内容.access_status = (.content.access_status) # 将content中的子字段提取到根级.http_user_agent = (.content.http_user_agent).remote_address = (.content.remote_address).request_length = (.content.request_length).request_method = (.content.request_method).request_uri = (.content.request_uri).server_name = (.content.server_name).time_finish = (.content.time_finish).access_status = (.content.access_status).time_start = (.content.time_start)del(.content) # 删除content字段.logtime = parse_timestamp!((.logtime), format:"%Y-%m-%d %H:%M:%S.%3f") # 格式化时间字段.time_start = parse_timestamp!((.time_start), format:"%Y-%m-%dT%H:%M:%S.%3f%:z") .time_finish = parse_timestamp!((.time_finish), format:"%Y-%m-%dT%H:%M:%S.%3f%:z").level = downcase(.level) # 将level字段值转小写.geoip = get_enrichment_table_record!("geoip_table", # ip地理位置信息解析{"ip": .remote_address})sinks:my_print:type: "console"inputs: ["my_transform"] # 匹配转换配置encoding:codec: "json" enrichment_tables:geoip_table: # 指定geoip数据库文件path: "/root/GeoLite2-City.mmdb"type: geoip

观察控制台输出,已经成功通过remote_address字段的ip地址获取到了地理位置信息内容。

2023-07-23 09:35:18.987 | INFO     | __main__:debug_log:49 - {'access_status': 200, 'request_method': 'GET', 'request_uri': '/account/', 'request_length': 67, 'remote_address': '185.14.47.131', 'server_name': 'cu-36.cn', 'time_start': '2023-07-23T09:35:18.879+08:00', 'time_finish': '2023-07-23T09:35:19.638+08:00', 'http_user_agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.2999.0 Safari/537.36'}
{"access_status":200,"class":"debug_log:49","geoip":{"city_name":"Hong Kong","continent_code":"AS","country_code":"HK","country_name":"Hong Kong","latitude":22.2842,"longitude":114.1759,"metro_code":null,"postal_code":null,"region_code":"HCW","region_name":"Central and Western District","timezone":"Asia/Hong_Kong"},"http_user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.2999.0 Safari/537.36","level":"info","logtime":"2023-07-23T01:35:18.987Z","remote_address":"185.14.47.131","request_length":67,"request_method":"GET","request_uri":"/account/","server_name":"cu-36.cn","time_finish":"2023-07-23T01:35:19.638Z","time_start":"2023-07-23T01:35:18.879Z"}

构建vector镜像

由于vector镜像未包含geoip数据库文件,如果需要根据IP地址解析获取地理位置信息,则需要提前构建包含geoip文件的vector镜像,并上传至harbor仓库中。

[root@tiaoban evk]# ls
Dockerfile  filebeat  GeoLite2-City.mmdb  kafka  log-demo.yaml  strimzi-kafka-operator  vector
[root@tiaoban evk]# cat Dockerfile 
FROM timberio/vector:0.34.1-debian
ADD GeoLite2-City.mmdb /etc/vector/GeoLite2-City.mmdb
[root@tiaoban evk]# docker build -t harbor.local.com/elk/vector:v0.34.1 .
[root@tiaoban evk]# docker push harbor.local.com/elk/vector:v0.34.1

k8s资源清单

  • vector-config.yaml

此配置文件为vector的全局通用配置文件,主要配置了api和监控指标。

apiVersion: v1
kind: ConfigMap
metadata:name: vector-confignamespace: elk
data:vector.yaml: |data_dir: "/var/lib/vector"api:enabled: trueaddress: "0.0.0.0:8686"sources:metrics:type: internal_metricsnamespace: vectorscrape_interval_secs: 30sinks:prometheus:type: prometheus_exporterinputs:- metricsaddress: 0.0.0.0:9598default_namespace: service
  • pod-config.yaml

此配置文件主要是从kafka中读取数据,然后移除非必要的字段信息,最后写入es中

apiVersion: v1
kind: ConfigMap
metadata:name: pod-confignamespace: elk
data:pod.yaml: |sources:pod_kafka:type: "kafka"bootstrap_servers: "my-cluster-kafka-brokers.kafka.svc:9092"group_id: "pod"topics:- "pod_logs"decoding:codec: "json"auto_offset_reset: "latest"transforms:pod_transform:type: remapinputs:    # 匹配输入源- pod_kafkasource: |del(.agent)del(.event)del(.ecs)del(.host)del(.input)del(.kubernetes.labels)del(.log)del(.orchestrator)del(.stream)sinks:pod_es:type: elasticsearchinputs: ["pod_transform"] # 匹配转换配置api_version: "v8"        # ES版本,非必填mode: "data_stream" # 数据流方式写入auth: # es认证信息 strategy: "basic"user: "elastic"password: "2zg5q6AU7xW5jY649yuEpZ47"data_stream: # 数据流名称配置type: "logs"dataset: "pod"namespace: "elk"endpoints: ["https://elasticsearch-es-http.elk.svc:9200"] # es连接地址tls: # tls证书配置verify_certificate: false # 跳过证书验证
  • myapp-config.yaml

此配置文件从kafka中读取pod日志数据,然后通过filter过滤出log-demo的日志数据,做进一步解析处理后写入es中

apiVersion: v1
kind: ConfigMap
metadata:name: myapp-confignamespace: elk
data:myapp.yaml: |sources:myapp_kafka:type: "kafka"bootstrap_servers: "my-cluster-kafka-brokers.kafka.svc:9092"group_id: "myapp"topics:- "pod_logs"decoding:codec: "json"auto_offset_reset: "latest"transforms:myapp_filter:type: filterinputs:    # 匹配输入源- myapp_kafkacondition: |.kubernetes.deployment.name == "log-demo"myapp_transform:type: remapinputs:    # 匹配输入源- myapp_filtersource: |. = parse_regex!(.message, r'^(?<logtime>[^|]+) \| (?<level>[A-Z]*) *\| __main__:(?<class>\D*:\d*) - (?<content>.*)$') # 正则提取logtime、level、class、content.content = replace(.content, "'", "\"") # 将content单引号替换为双引号.content = parse_json!(.content) # json解析content内容.access_status = (.content.access_status) # 将content中的子字段提取到根级.http_user_agent = (.content.http_user_agent).remote_address = (.content.remote_address).request_length = (.content.request_length).request_method = (.content.request_method).request_uri = (.content.request_uri).server_name = (.content.server_name).time_finish = (.content.time_finish).access_status = (.content.access_status).time_start = (.content.time_start)del(.content) # 删除content字段.logtime = parse_timestamp!((.logtime), format:"%Y-%m-%d %H:%M:%S.%3f") # 格式化时间字段.time_start = parse_timestamp!((.time_start), format:"%Y-%m-%dT%H:%M:%S.%3f%:z") .time_finish = parse_timestamp!((.time_finish), format:"%Y-%m-%dT%H:%M:%S.%3f%:z").level = downcase(.level) # 将level字段值转小写.geoip = get_enrichment_table_record!("geoip_table",{"ip": .remote_address}) # 地理位置信息解析sinks:myapp_es:type: elasticsearchinputs: ["myapp_transform"] # 匹配转换配置api_version: "v8"        # ES版本,非必填mode: "data_stream" # 数据流方式写入auth: # es认证信息 strategy: "basic"user: "elastic"password: "2zg5q6AU7xW5jY649yuEpZ47"data_stream: # 数据流名称配置type: "logs"dataset: "myapp"namespace: "elk"endpoints: ["https://elasticsearch-es-http.elk.svc:9200"] # es连接地址tls: # tls证书配置verify_certificate: false # 跳过证书验证 enrichment_tables:geoip_table: # 指定geoip数据库文件path: "/etc/vector/GeoLite2-City.mmdb"type: geoip
  • deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:name: vectornamespace: elk
spec:replicas: 2selector:matchLabels:app: vectortemplate:metadata:labels:app: vectorspec:securityContext:runAsUser: 0containers:- image: harbor.local.com/elk/vector:v0.34.1name: vectorresources:limits:cpu: "1"memory: 1Giargs:- -c- /etc/vector/*.yamlports:- name: exportercontainerPort: 9598- name: apicontainerPort: 8686volumeMounts:- name: timezonemountPath: /etc/localtime- name: datamountPath: /var/lib/vector- name: vector-configmountPath: /etc/vector/vector.yamlsubPath: vector.yaml- name: pod-configmountPath: /etc/vector/pod.yamlsubPath: pod.yaml- name: myapp-configmountPath: /etc/vector/myapp.yamlsubPath: myapp.yamlreadinessProbe:httpGet:path: /healthport: 8686livenessProbe:httpGet:path: /healthport: 8686volumes:- name: timezonehostPath:path: /usr/share/zoneinfo/Asia/Shanghai- name: datahostPath:path: /data/vectortype: DirectoryOrCreate- name: vector-configconfigMap:name: vector-config- name: pod-configconfigMap:name: pod-config- name: myapp-configconfigMap:name: myapp-config
  • service.yaml
apiVersion: v1
kind: Service
metadata:name: vector-exporternamespace: elk
spec:selector:app: vectorports:- name: exporter port: 9598 targetPort: 9598

效果演示

查看kibana数据流信息,已成功创建myapp和pod日志的索引。
image.png
查看pod索引数据信息
image.png
查看myapp索引数据信息
image.png

注意事项

多配置文件启动

我们可以通过-c /etc/vector/*.yaml方式指定多个配置文件启动,此时vector会扫描指定路径下的所有yaml配置文件并加载启动。这样配置便于管理各个管道处理规则配置,使配置文件结构更加清晰,便于日后维护工作。

管道名称全局唯一

sources、transforms、sinks的自定义名称,全局必须唯一,尤其是多个vector配置文件时,唯一名称尤为重要。

完整资源清单

本实验案例所有yaml文件已上传至git仓库。访问地址如下:

github

https://github.com/cuiliang0302/blog-demo

gitee

https://gitee.com/cuiliang0302/blog_demo

参考文档

VRL常用函数:https://vector.dev/docs/reference/vrl/functions/
VRL常用案例:https://vector.dev/docs/reference/vrl/examples/
VRL时间格式化:https://docs.rs/chrono/latest/chrono/format/strftime/index.html#specifiers

查看更多

微信公众号

微信公众号同步更新,欢迎关注微信公众号《崔亮的博客》第一时间获取最近文章。

博客网站

崔亮的博客-专注devops自动化运维,传播优秀it运维技术文章。更多原创运维开发相关文章,欢迎访问https://www.cuiliangblog.cn

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/171755.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CSS新手入门笔记整理:CSS基本选择器

id属性 id属性具有唯一性&#xff0c;也就是说&#xff0c;在一个页面中相同的id只能出现一次。在不同的页面中&#xff0c;可以出现两个id相同的元素。 语法 <div id"text"> ...... </div> class属性 class&#xff0c;顾名思义&#xff0c;就是“类…

LeetCode.283移动零(双指针)

LeetCode.283移动零 1.问题描述2.解题思路3.代码 1.问题描述 给定一个数组 nums&#xff0c;编写一个函数将所有 0 移动到数组的末尾&#xff0c;同时保持非零元素的相对顺序。 请注意 &#xff0c;必须在不复制数组的情况下原地对数组进行操作。 示例 1: 输入: nums [0,1…

(三) Windows 下 Sublime Text 3 配置Python环境和Anaconda代码提示

一&#xff1a;新建一个 Python3.7 编译环境。 1 Tools--Build System--New Build System... 修改前&#xff1a; 修改后&#xff1a; 内容&#xff1a; {"cmd":["C:\\Python\\Python37-32\\python.exe","-u","$file"],"file_r…

开通橱窗还能开抖店吗?怎么开通?一篇详解!

我是电商珠珠 开通商品橱窗之后还能开抖店吗&#xff1f;商品橱窗和抖音小店可以同时开吗&#xff1f; 一部分人最初的时候&#xff0c;都觉得直播带货很火&#xff0c;所以就自己去买粉丝或是发视频积攒粉丝&#xff0c;等粉丝够了发现&#xff0c;好像和当初想的不太一样&a…

关于反射、枚举以及Lambda表达式你了解多少呢?快来看看吧~

目录 1、反射 1.1、定义 1.2、用途 1.3、反射基本信息 1.4、反射相关的类【重点】 1.5、Class类&#xff08;反射机制的起源&#xff09; 1.6、Class类中相关的方法 1.7、获得Class对象的三种方式 1.8、反射的使用 1.9、反射的优点、缺点 2、枚举 2.1、背景及定义 …

ZGC 垃圾回收过程

ZGC&#xff08;Z Garbage Collector&#xff09;是Java平台上的一种垃圾收集器&#xff0c;它是由Oracle开发的&#xff0c;旨在解决大堆的低延迟垃圾收集问题。ZGC是一种并发的分代垃圾收集器&#xff0c;它主要针对具有大内存需求和低停顿时间要求的应用程序 ZGC的核心概念及…

基于STC12C5A60S2系列1T 8051单片按页写IIC总线器件24C02并显示在液晶显示器LCD1602上应用

基于STC12C5A60S2系列1T 8051单片机按页写IIC总线器件24C02并显示在液晶显示器LCD1602上应用 STC12C5A60S2系列1T 8051单片机管脚图STC12C5A60S2系列1T 8051单片机I/O口各种不同工作模式及配置STC12C5A60S2系列1T 8051单片机I/O口各种不同工作模式介绍液晶显示器LCD1602简单介绍…

【brpc学习实践十】streaming log实战

实战实例 通常我们在服务还没正式起来时&#xff0c;会用brpc流式log打印&#xff0c;支持对日志输出到ostream对象中&#xff08;默认std)。同时会在服务初始化时配置LogSink&#xff0c;实现自己的log&#xff0c;这样后续都可以将输出重定向至自己的log. int init(int arg…

Linux MMC子系统 - 6.eMMC 5.1工作模式-设备识别模式

By: Ailson Jack Date: 2023.11.26 个人博客&#xff1a;http://www.only2fire.com/ 本文在我博客的地址是&#xff1a;http://www.only2fire.com/archives/165.html&#xff0c;排版更好&#xff0c;便于学习&#xff0c;也可以去我博客逛逛&#xff0c;兴许有你想要的内容呢。…

RocketMQ 安装部署及应用场景记录

文章目录 前言一、RocketMQ简介1.1 整体架构 二、RocketMQ安装部署2.1 RocketMQ 下载2.2 修改 JVM 参数2.3 启动 NameServer 和 Broker2.4 验证发送和接受消息2.5 停止 NameServer 和 Broker2.6 配置全局环境 三、RocketMQ应用场景3.1 异步处理3.2 应用解耦3.3 流量削峰 前言 …

7.前端--CSS-字体属性【2023.11.26】

CSS字体属性 CSS Fonts (字体)属性用于定义字体样式、粗细、大小、和字形。 1.文字样式 CSS 使用 font-style 属性设置文本的风格。 语法&#xff1a; p { font-style: normal; }属性&#xff1a; 2字体粗细 CSS 使用 font-weight 属性设置文本字体的粗细。 语法&#xff1a…

已知两个链表L1和L2分别表示两个集合,其中元素递增排列。请设计一个算法,用于求出L1与L2的交集,并存放在L1链表中

已知两个链表L1和L2分别表示两个集合&#xff0c;其中元素递增排列。请设计一个算法&#xff0c;用于求出L1与L2的交集&#xff0c;并存放在L1链表中。 代码思路&#xff1a; 我们创建一个辅助链表L3&#xff0c;用于存储L1和L2链表的交集&#xff0c;用s遍历L3各个元素 用p和…

Java | The last packet sent successfully to the server was xxx milliseconds ago

最近在部署代码后&#xff0c;后端总是会遇到这个问题&#xff0c;设备通道在访问数据库时经常会报错&#xff0c;在搜集大量资料后我以为是配置问题&#xff0c;首先要保证&#xff1a; &#xff08;1&#xff09;首先确定jdbc.url地址是正确的 &#xff08;2&#xf…

2024年天津天狮学院专升本食品质量与安全专业《分析化学》考纲

2024年天津天狮学院食品质量与安全专业高职升本入学考试《分析化学》考试大纲 一、考试性质 《分析化学》专业课程考试是天津天狮学院食品质量与安全专业高职升本入学考试 的必考科目之一&#xff0c;其性质是考核学生是否达到了升入本科继续学习的要求而进行的选拔性考试。《…

2023年09月 Scratch(三级)真题解析#中国电子学会#全国青少年软件编程等级考试

Scratch等级考试(1~4级)全部真题・点这里 一、单选题(共25题,每题2分,共50分) 第1题 运行下面程序后,角色的x坐标值是?( ) A:100 B:90 C:110 D:120 答案:C 利用变量值作为条件,控制循环的次数。变量从0~10的过程中每次角色的x坐标都增加了10,当变量值为1…

Unity-链接MySql8.0

链接MySql8.0 1.准备dll 一、找到l18N相关的dll 这里给出一个参考地址 D:\Unity\2020.3.48f1c1\Editor\Data\MonoBleedingEdge\lib\mono\unityjit在里面找到如下图的四个dll 二、下载数据库链接dll https://downloads.mysql.com/archives/c-net/在这里搜索历史版本(Archiv…

flask依据现有的库表快速生成flask实体类

flask依据现有的库表快速生成flask实体类 在实际开发过程中&#xff0c;flask的sqlalchemy对应的model类写起来重复性较强&#xff0c;如果表比较多会比较繁琐&#xff0c;这个时候可以使用 flask-sqlacodegen 来快速的生成model程序或者py文件&#xff0c;以下是简单的示例&a…

Echart力引导依赖关系布局图

Echarts ECharts&#xff08;Enterprise Charts&#xff09;Apache ECharts是百度开发的一款开源的 JavaScript 数据可视化库。它提供了丰富的图表和图形&#xff0c;适用于在 Web 应用程序中创建各种交互式和动态的数据可视化图表。ECharts支持各种图表类型&#xff0c;包括折…

一. BEV感知算法介绍

目录 前言1. BEV感知算法的概念2. BEV感知算法数据形式3. BEV开源数据集介绍3.1 KITTI数据集3.2 nuScenes数据集 4. BEV感知方法分类4.1 纯点云方案4.2 纯视觉方案4.3 多模态方案 5. BEV感知算法的优劣6. BEV感知算法的应用介绍7. 课程框架介绍与配置总结下载链接参考 前言 自动…

Java中wait()方法在synchronized方法中调用的奥秘

作为一名Java程序员&#xff0c;我们深知synchronized关键字和wait()方法在多线程编程中的重要性。 在本文中&#xff0c;我们将探讨为什么wait()方法需要在synchronized方法中调用&#xff0c;以及它们是如何协同工作的。 首先&#xff0c;让我们了解一下synchronized关键字和…