Kafka 业务日志采集最佳实践

简介

Apache Kafka 是一个分布式流处理平台,主要用于构建实时数据流管道和应用程序。在收集业务日志的场景中,Kafka 可以作为一个消息中间件,用于接收、存储和转发大量的日志数据。将 Kafka 与其他系统(如 Elasticsearch、Flume、Spark Streaming 等)集成,以提供更丰富的日志处理和分析功能。本文提到的是和观测云集成,即通过观测云的采集器 Datakit 采集 Kafka 中的业务日志,下面通过一些例子了解下观测云的快速集成效果。

实践环境

前置条件

  • 注册观测云

软件和中间件

  • Kafka3.2.0
  • Datakit采集器
  • JDK 8

硬件

  • 云服务器 CentOS7.9 64位 4vCPU,8GB 内存,100GB 云盘一台。

接入方案

准备 Kafka 环境

安装 Kafka

下载 3.2.0 版本,解压即可使用。

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.13-3.2.0.tgz

注:目前 Datakit 支持的 Kafka 版本有[version:0.8.2 ~ 3.2.0]

启动 Zookeeper 服务
$ bin/zookeeper-server-start.sh config/zookeeper.properties
启动 KafkaServer
$ bin/kafka-server-start.sh config/server.properties
创建 Topic

创建名为 testlog 的 Topic 。

$ bin/kafka-topics.sh --create --topic testlog --bootstrap-server localhost:9092
启动 Producer
$ bin/kafka-console-producer.sh --topic testlog --bootstrap-server localhost:9092

安装 DataKit

参考官网文档安装 DataKit 采集器

TOKEN 依据你的观测云工作空间来填写
DK_DATAWAY=https://openway.guance.com?token=<TOKEN> bash -c "$(curl -L https://static.guance.com/datakit/install.sh)"

开启 Kafka 采集器

进入 DataKit 安装目录下 (默认是 /usr/local/datakit/conf.d/ ) 的 conf.d/kafkamq 目录,复制 kafkamq.conf.sample 并命名为 kafkamq.conf 。

类似如下:

-rwxr-xr-x 1 root root 2574 Apr 30 23:52 kafkamq.conf
-rwxr-xr-x 1 root root 2579 May  1 00:40 kafkamq.conf.sample

调制 kafka 采集器配置如下:

  • addrs = ["localhost:9092"],该文采集器 DataKit 和 Kafka 安装到同一台操作系统中,localhost 即可。
  • kafka_version = "3.2.0",该文使用 Kafka 的版本。
  • [inputs.kafkamq.custom],删除注释符号“#”。
  • [inputs.kafkamq.custom.log_topic_map],删除注释符号“#”。
  • "testlog"="log.p",testlog 为 Topic 的名字,log.p 为观测云 Pipeline 可编程数据处理器的日志字段提取规则配置。涉及的业务日志和 log.p 的内容详细见下面的《使用 Pipeline》。
# {"version": "1.28.1", "desc": "do NOT edit this line"}[[inputs.kafkamq]]addrs = ["localhost:9092"]# your kafka version:0.8.2 ~ 3.2.0kafka_version = "3.2.0"group_id = "datakit-group"# consumer group partition assignment strategy (range, roundrobin, sticky)assignor = "roundrobin"## rate limit.#limit_sec = 100## sample# sampling_rate = 1.0## kafka tls config# tls_enable = true# tls_security_protocol = "SASL_PLAINTEXT"# tls_sasl_mechanism = "PLAIN"# tls_sasl_plain_username = "user"# tls_sasl_plain_password = "pw"## -1:Offset Newest, -2:Offset Oldestoffsets=-1## skywalking custom#[inputs.kafkamq.skywalking]## Required!send to datakit skywalking input.#dk_endpoint="http://localhost:9529"#thread = 8 #topics = [#  "skywalking-metrics",#  "skywalking-profilings",#  "skywalking-segments",#  "skywalking-managements",#  "skywalking-meters",#  "skywalking-logging",#]#namespace = ""## Jaeger from kafka. Please make sure your Datakit Jaeger collector is open !!!#[inputs.kafkamq.jaeger]## Required! ipv6 is "[::1]:9529"#dk_endpoint="http://localhost:9529"#thread = 8 #source: agent,otel,others...#source = "agent"## Required! topics#topics=["jaeger-spans","jaeger-my-spans"]## user custom message with PL script.[inputs.kafkamq.custom]#spilt_json_body = true#thread = 8 ## spilt_topic_map determines whether to enable log splitting for specific topic based on the values in the spilt_topic_map[topic].#[inputs.kafkamq.custom.spilt_topic_map]#  "log_topic"=true#  "log01"=false[inputs.kafkamq.custom.log_topic_map]"testlog"="log.p"#  "log01"="log_01.p"#[inputs.kafkamq.custom.metric_topic_map]#  "metric_topic"="metric.p"#  "metric01"="rum_apm.p"#[inputs.kafkamq.custom.rum_topic_map]#  "rum_topic"="rum_01.p"#  "rum_02"="rum_02.p"#[inputs.kafkamq.remote_handle]## Required!#endpoint="http://localhost:8080"## Required! topics#topics=["spans","my-spans"]# send_message_count = 100# debug = false# is_response_point = true# header_check = false## Receive and consume OTEL data from kafka.#[inputs.kafkamq.otel]#dk_endpoint="http://localhost:9529"#trace_api="/otel/v1/trace"#metric_api="/otel/v1/metric"#trace_topics=["trace1","trace2"]#metric_topics=["otel-metric","otel-metric1"]#thread = 8 ## todo: add other input-mq

注意:开启或调整 DataKit 的配置,需重启采集器(shell 下执行 datakit service -R)。

使用 Pipeline

log.p 规则内容

data = load_json(message)
protocol = data["protocol"]
response_code = data["response_code"]
set_tag(protocol,protocol)
set_tag(response_code,response_code)
group_between(response_code,[200,300],"info","status")
group_between(response_code,[400,499],"warning","status")
group_between(response_code,[500,599],"error","status")time = data["start_time"]
set_tag(time,time)
default_time(time)

效果展示

发送业务日志样例

业务日志样例文件如下:

#info
{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":204,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-05-01T00:37:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
#error
{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":504,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-05-01T00:39:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
#warn
{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":404,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-05-01T00:38:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}

日志发送命令

在 Producer 启动后,分别发送如下三条日志内容,三条日志一条为 info 级别("response_code":204),另一条为 error 级别("response_code":504),最后一条为 warn 级别日志("response_code":404)。

>{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":204,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-04-30T08:47:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
>
>
>
>
>{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":504,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-04-30T08:47:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
>
>
>
>{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":404,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-04-30T08:47:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}

通过 DataKit 采集到 Kafka 的三条业务日志

使用 Pipeline 对业务日志进行字段提取

下图 protocol、response_code 以及 time 都是使用 Pipeline 提取后的效果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/8716.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Nginx启动后不能正常访问

背景介绍 新增NGINX配置文件后&#xff0c;重启nginx无法访问服务。NGINX启动不报错&#xff0c;测试nginx配置文件nginx -t也没问题。 定位问题思路与步骤 查看NGINX配置文件 发现NGINX配置文件中的user是www user www www;查看NGINX相关文件夹的权限 发现有些文件的所…

docker-compose安装 人大金仓数据库

下载官网安装包 将安装包重命名为: kingbase.tar 再导入镜像仓库 docker load -i kingbase.tar目录创建data文件夹创建docker-compose文件 version: 3 services: kingbase: image: kingbase:v1 container_name: kingbaseports: - "54321:54321" volumes: -…

解决微信小程序电脑能正常使用,手机端无法正常访问的SSL证书问题

目录 前言1 问题描述与调试2 探索问题根源2.1 用户反馈收集2.2 尝试手机端访问2.3 PC端调试 3 确认问题与解决方案3.1 检查SSL证书3.2 重新部署SSL证书3.3 测试修复效果 4 SSL&#xff08;Secure Sockets Layer&#xff09;证书中间证书4.1 SSL证书链的构成4.2 中间证书的作用 …

【管理咨询宝藏97】智慧物流园区顶层设计方案

本报告首发于公号“管理咨询宝藏”&#xff0c;如需阅读完整版报告内容&#xff0c;请查阅公号“管理咨询宝藏”。 【管理咨询宝藏97】智慧物流园区顶层设计方案 【格式】PDF版本 【关键词】智慧园区、制造型企业转型、数字化转型 【核心观点】 - 中国物流业整体呈现集中度低…

springboot项目 字典/枚举翻译 终极解决方案 AOP+自定义注解+递归实体字段+实体动态三级缓存+责任链+多种转换方式

目录 前言实现思路技术确定 食用方式效果使用样例项目中使用第一步 复制包第二步 实现LoadDictDatabase并将其注入容器第三步 标识需要翻译的字段第四步 标识需要翻译的方法第五步 调用需要翻译的方法 实现细节TODO 前言 字典,即在存储介质中进行存储时,为了避免业务上对其名称…

数据结构复习指导之二叉树的概念

文章目录 二叉树 考纲内容 复习提示 1.二叉树的概念 1.1二叉树的定义及其主要特性 1.1.1二叉树的定义 1.1.2几种特殊的二叉树 1.1.3二叉树的性质 1.2二叉树的存储结构 1.2.1顺序存储结构 1.2.2链式存储结构 知识回顾 二叉树 考纲内容 &#xff08;一&#xff09;树…

LeetCode 每日一题 2024/4/29-2024/5/5

记录了初步解题思路 以及本地实现代码&#xff1b;并不一定为最优 也希望大家能一起探讨 一起进步 目录 4/291146. 快照数组4/30 2798. 满足目标工作时长的员工数目5/1 2462. 雇佣 K 位工人的总代价5/2 857. 雇佣 K 名工人的最低成本5/3 1491. 去掉最低工资和最高工资后的工资平…

苹果Mac用户下载VS Code(Universal、Intel Chip、Apple Silicon)哪个版本?

苹果macOS用户既可以下载通用版&#xff08;Universal&#xff09;&#xff0c;软件将自动检测用户的处理器并进行适配。 也可以根据型号下载对应CPU的版本&#xff1a; 使用Intel CPU的Mac电脑可下载Intel Chip版本&#xff1b; 使用苹果自研M系列CPU的Mac电脑下载Apple Si…

Oracle Patch清理

场景&#xff1a; 在对Oracle安装补丁后&#xff0c;会发现OS上被占用了大量的空间&#xff0c;本文档清理Opatch过程中的一些文件&#xff0c;释放空间 参考文档&#xff1a; Can You Delete $ORACLE_HOME/.patch_storage Directory ? (Doc ID 403218.1) How To Avoid Disk …

【Redis7】10大数据类型之Hash类型

文章目录 1.Hash类型2.常用命令3.示例hset和hgethgetallhlenhkeys和hvalshexistshdelhincrby和hincrbyfloathsetnx 1.Hash类型 Redis中的Hash类型是一种高效的数据结构&#xff0c;用于存储键值对的集合。这种类型特别适用于表示对象&#xff0c;因为它允许你将对象的多个属性…

Context capture/Pix4Dmapper/AutoCAD/CASS/EPS软件的安装流程与使用方法;土方量计算;无人机摄影测量数据处理

目录 专题一 无人机摄影测量技术应用现状及其发展 专题二 基本原理和关键技术讲解 专题三 无人机影像外业数据获取 专题四 数据处理环境建立与软件熟悉 专题五 GNSS数据土方量计算 专题六 基于无人机影像数据的正射影像制作 专题七 基于无人机影像数据的三维模型制作 专…

gocator导出图片

想用3D扫描后的图片&#xff0c;但是系统自带的导出方法很麻烦&#xff0c;所以考虑通过sdk导出 首先需要设置点云亮度 这里是导出图片的关键代码 case GoDataMessageType.SurfaceIntensity: { Debug.WriteLine("SurfaceIntensity "); GoSu…

线段树专题

落谷1607 #include<bits/stdc.h> using namespace std;#define ls u<<1 #define rs u<<1|1const int N 1e5;int n, k, c, ans; struct line {int l, r, m; // m为候车的牛的数量bool operator<(line b) {return r < b.r;} }s[N];struct tree {int l, …

antV X6的简要使用教程

&#x1f9d1;‍&#x1f393; 个人主页&#xff1a;《爱蹦跶的大A阿》 &#x1f525;当前正在更新专栏&#xff1a;《VUE》 、《JavaScript保姆级教程》、《krpano》、《krpano中文文档》 ​ ​ ✨ 前言 在我们的日常开发工作中&#xff0c;我们经常需要构建复杂的交互式图…

【MM32F3270火龙果】点亮led

文章目录 前言GPIO的工作模式一、有哪些工作模式&#xff1f;1.1 GPIO的详细介绍1.2 GPIO的内部框图输入模式输出部分 二、操作GPIO点亮led2.1 初始化gpio2.2 写gpio 三、示例代码总结 前言 本文将介绍如何在MM32F3270火龙果微控制器上使用Keil开发环境点亮LED。MM32F3270火龙…

Java中使用FlatBuffers实现序列化

Java 中的 FlatBuffers有助于高速数据序列化/反序列化&#xff0c;消除解析开销。它由 Google 开发&#xff0c;为跨平台数据交换提供无模式、内存高效的解决方案。 Java 开发人员可以利用其直接内存访问来实现最佳性能和最小内存占用&#xff0c;从而提高应用程序速度、可扩展…

Kamailio openssl 3.0.x 需要注意的事项

我们留意到 Debian Bookworm 安装的 openssl 版本是 3.0.x 这里有几个地方要注意&#xff1a; modparam("tls", "init_mode", 1)核心参数 tls_threads_mode 配置为 1 或者 配置为 2版本建议用 v 5.8.1 参考链接&#xff1a; #3832#3765#3791 目前官方…

北交所佣金费率标准是多少?北交所相关信息科普

北交所的佣金费率并非固定不变&#xff0c;而是可以根据投资者的需求和证券公司的政策进行调整。目前北交所的佣金费率最低是万分之二。 一般来说&#xff0c;北交所的佣金费率默认在万分之三左右&#xff0c;但这不是固定的费率。根据证券公司的不同&#xff0c;佣金费率可以…

绝地求生:PCL第五轮数据出炉,XDD与林树入选最佳阵容,韦神真在做事了

距离PCL季后赛开赛还有两天时间&#xff0c;小梦还沉浸在常规赛最后一场WCG和TSG争名额的关键之战&#xff0c;现在看来WCG硬贴PeRo房区那波真是没道理&#xff0c;只能解释为太想拿分导致上头了&#xff0c;导致决赛圈没有余力限制TSG&#xff0c;但这何尝不是TSG自己的剧本&a…

算法训练Day52 | ● 84.柱状图中最大的矩形

84.柱状图中最大的矩形 class Solution { public:int largestRectangleArea(vector<int>& heights) {stack<int> s;s.push(-1);s.push(0);int space 0;for(int i1; i<heights.size(); i){while(s.top()! -1 && (iheights.size() || heights[i]<…