Flink SQL 实时读取 kafka 数据写入 Clickhouse —— 日志处理(三)

文章目录

  • 前言
  • Clickhouse 表设计
    • adlp_log_local 本地表
    • adlp_log 分布式表
  • Flink SQL 说明
    • 创建 Source Table (Kafka) 连接器表
    • 创建 Sink Table (Clickhouse) 连接器
    • 解析 Message 写入 Sink
  • 日志查询演示
  • 总结

前言

在之前的文章中,我们总结了如何在 Django 项目中进行日志配置,以及如何在 k8s 上部署 Filebeat 采集 PVC 中的日志发送至 Kafka:

  • Django 日志控制台输出、文件写入按天拆分文件,自定义 Filter 增加 trace_id 以及过滤——日志处理(一)
  • Filebeat k8s 部署(Deployment)采集 PVC 日志发送至 Kafka——日志处理(二)

本文将总结如何使用 Flink SQL 实时将 kafka 中的日志消息发送至 Clickhouse 表中。

说明
限于文章主题和篇幅,本文不会将如何部署和使用 Flink SQL, 关于这些内容过多而且网上资料也很多,就不再赘述。
本文的核心是说明如何设计 Clickhouse 表结构,以及对应的 Flink SQL 说明。

Clickhouse 表设计


上图中的JSON 内容是kafka 中的日志消息,我们需要读取该消息中的 message 字段(我们的日志信息),然后将该字段中的 time, level, func, trace_id, message 保存至 clickhouse 中。
这里我使用两张表保存日志:

  • adlp_log_local本地表
  • adlp_log分布式表,FlinkSQL 实时写入分布式表

adlp_log_local 本地表

create table if not exists cloud_data.adlp_log_local on cluster perftest_5shards_2replicas
(`dt`             DateTime64(3),`level`          LowCardinality(String),`trace_id`       String,`func`           String,`message`        String,-- 建立索引加速低命中率内容的查询INDEX idx_trace_id `trace_id` TYPE tokenbf_v1(4096, 2, 0) GRANULARITY 2,INDEX idx_message `message` TYPE tokenbf_v1(30720, 2, 0) GRANULARITY 1
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/cloud_data/adlp_log_local', '{replica}')PARTITION BY toYYYYMMDD(dt)PRIMARY KEY (dt, trace_id)ORDER BY (dt, trace_id)TTL toDateTime(dt) + toIntervalDay(30);

字段说明

  • dt (DateTime64(3)): 存储日志时间戳,精确到毫秒。
  • level (LowCardinality(String)): 存储日志级别,如 INFOERROR 等,使用 LowCardinality 优化存储和查询。
  • trace_id (String): 存储追踪 ID,通常用于关联一系列相关的日志记录。
  • func (String): 存储函数或方法名称,表示日志产生的位置。
  • message(String): 存储日志消息的具体内容。

索引

  • idx_trace_id: 使用 tokenbf_v1 类型的布隆过滤器索引(tokenbf_v1(4096, 2, 0)),在 trace_id 字段上创建,粒度为 2。布隆过滤器索引适合低命中率的查询,能够快速过滤出大多数不匹配的记录。
  • idx_message: 使用 tokenbf_v1 类型的布隆过滤器索引(tokenbf_v1(30720, 2, 0)),在 message 字段上创建,粒度为 1。同样用于加速低命中率的查询。

存储引擎

  • ReplicatedMergeTree: 使用分布式和复制的存储引擎,路径模板为 /clickhouse/tables/{layer}-{shard}/cloud_data/adlp_log_local,副本名称为 {replica},保证数据的高可用性和一致性。

分区和排序

  • 分区 (PARTITION BY): 按 dt 字段的年月日(toYYYYMMDD(dt))进行分区,有助于管理和查询按天划分的数据。
  • 主键 (PRIMARY KEY): 主键由 dttrace_id 组成,有助于高效查询。
  • 排序 (ORDER BY): 按 dttrace_id 字段排序,优化基于时间和 trace ID 的查询。

数据生命周期 (TTL)

  • TTL (Time To Live): 配置数据的生存时间,数据在 dt 字段的时间加上 30 天后自动过期删除,保持数据表的清洁和高效。

adlp_log 分布式表

create table if not exists cloud_data.adlp_log on cluster perftest_5shards_2replicas
(`dt`             DateTime64(3),`level`          LowCardinality(String),`trace_id`       String,`func`           String,`message`        String
)
ENGINE = Distributed('perftest_5shards_2replicas', 'cloud_data', 'adlp_log_local', rand());

字段说明
与本地表 adlp_log_local 相同,包含以下字段:

  • dt (DateTime64(3))
  • level (LowCardinality(String))
  • trace_id (String)
  • func (String)
  • message (String)

存储引擎
Distributed: 分布式引擎,允许将数据分布到多个分片和副本中。参数解释如下:

  • 集群名称 (perftest_5shards_2replicas): 指定集群的名称。
  • 数据库 (cloud_data): 数据库名称。
  • 表 (adlp_log_local): 本地表的名称。
  • 分片键 (rand()): 使用随机函数进行数据分片,保证数据均匀分布。

Flink SQL 说明

创建 Source Table (Kafka) 连接器表

CREATE TEMPORARY TABLE source_table (message STRING
) WITH ('connector' = 'kafka','topic' = 'filebeat_logs','properties.bootstrap.servers' = '127.0.0.1:9092','properties.group.id' = 'prod-logs-k2c','scan.startup.mode' = 'earliest-offset','format' = 'json','json.ignore-parse-errors' = 'false','json.fail-on-missing-field' = 'false','properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.mechanism' = 'PLAIN','properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";'
);

创建 Sink Table (Clickhouse) 连接器

CREATE TEMPORARY TABLE sink_table (`dt` TIMESTAMP(3),`level` STRING ,`trace_id` STRING ,`func` STRING ,`message` STRING
) WITH ('connector' = 'clickhouse','url' = 'clickhouse://127.0.0.1:8123','username' = 'admin','password' = 'admin','database-name' = 'cloud_data','table-name' = 'adlp_log','use-local' = 'true','sink.batch-size' = '1000','sink.flush-interval' = '1000','sink.max-retries' = '10','sink.update-strategy' = 'insert','sink.sharding.use-table-definition' = 'true','sink.parallelism' = '1'
);

解析 Message 写入 Sink

INSERT INTO sink_table
SELECT TO_TIMESTAMP(JSON_VALUE(message, '$.time'), 'yyyy-MM-dd HH:mm:ss') AS dt,JSON_VALUE(message, '$.level') AS level,JSON_VALUE(message, '$.trace_id') AS trace_id,JSON_VALUE(message, '$.func') AS func,JSON_VALUE(message, '$.message') AS message
FROM source_table;

注意:
这里在写入的时候默认我们的日志格式是 JSON 的,如果我们的日志发送到 kafka 不是 JSON 格式的,上边的 JSON_VALUE 可能会报错。当然,我们也可以在条件中加上是否为 JSON 判断,但是我觉得没必要。

日志查询演示

我们的日志导入成功后,可以通过第三方查询工具查询 clickhouse 数据源,我这里使用的是 superset 去查询 clickhouse 数据源。
通过 trace_id 查询整个执行链路的日志
image.png
查询错误日志信息
image.png

全文检索 message 日志信息
image.png

更多扩展

  • superset 是一个强大的 BI 工具,可以将我们的日志中的一些指标做成看板,比如说关键错误日志数量,然后设置告警,发送通知。
  • 通过 Flink SQL 实时将我们的日志从 kafka 中写入 clickhouse ,结合 clickhouse 强大的查询功能,以及 superset 强大的 BI 功能,可以充分挖掘业务日志中的潜在价值。

总结

本文总结了如何使用使用 Clickhouse 保存日志数据,以及如何通过 Flink SQL 将我们的日志实时从 kafka 同步至 clickhouse,然后在结合强大的第三方查询 BI 工具 superset,玩转业务日志,挖掘业务日志的潜在价值。
本文设计到的技能知识点比较多,需要熟悉 Clickhouse, Kafka, FlinkSQL, Superset 等,我之前的文章中总结了一些关于 Clickhouse 和 Kafka 相关的内容,感兴趣的读者可以看看:

clickhouse

  • Clickhouse字典关联外部 MySQL 表联合查询实践
  • ClickHouse架构概览 —— Clickhouse 架构篇(一)
  • ClickHouse 使用技巧总结
  • ClickHouse 实现用户画像(标签)系统实践

kafka

  • kafka 生产者 API 实践总结
  • kafka 消费者 API 使用总结
  • 保证 Kafka 数据可靠性最佳实践总结
  • kafka 实现精确一次性语义实践总结

superset

  • Pycharm 调试 superset 源码配置(远程调试)
  • superset 二开增加 flink 数据源连接通过flink sql 查询数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/49869.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

构建智慧水利系统,优化水资源管理:结合物联网、云计算等先进技术,打造全方位、高效的水利管理系统,实现水资源的最大化利用

本文关键词:智慧水利、智慧水利工程、智慧水利发展前景、智慧水利技术、智慧水利信息化系统、智慧水利解决方案、数字水利和智慧水利、数字水利工程、数字水利建设、数字水利概念、人水和协、智慧水库、智慧水库管理平台、智慧水库建设方案、智慧水库解决方案、智慧…

spring-boot3.x整合Swagger 3 (OpenAPI 3) +knife4j

1.简介 OpenAPI阶段的Swagger也被称为Swagger 3.0。在Swagger 2.0后,Swagger规范正式更名为OpenAPI规范,并且根据OpenAPI规范的版本号进行了更新。因此,Swagger 3.0对应的就是OpenAPI 3.0版本,它是Swagger在OpenAPI阶段推出的一个…

产品系统的UI暗色系和浅色系模式切换是符合人体视觉工程学的设计

视觉革命:UI设计中的暗夜与黎明 UI设计如同夜空中最亮的星辰,引领着用户穿梭于信息的海洋。而今,一场视觉革命正在悄然上演,它关乎于我们的眼睛,关乎于我们的体验——那就是产品系统的UI暗色系和浅色系模式的切换。如…

Android lmkd机制详解

目录 一、lmkd介绍 二、lmkd实现原理 2.1 工作原理图 2.2 初始化 2.3 oom_adj获取 2.4 监听psi事件及处理 2.5 进程选取与查杀 2.5.1 进程选取 2.5.2 进程查杀 三、关键系统属性 四、核心数据结构 五、代码时序 一、lmkd介绍 Android lmkd采用epoll方式监听linux内…

SpringBoot整合阿里云短信业务

详细介绍SpringBoot整合阿里云短信服务的每一步过程,同时会将验证码存放到Redis中并设置过期时间,尽量保证实战的同时也让没做过的好兄弟也能实现发短信的功能~ 1. 注册阿里云账号和创建Access Key 首先,你需要注册一个阿里云账号&#xff0…

【JavaEE初阶】Thread类及常见方法

目录 📕 Thread类的概念 📕 Thread 的常见构造方法 📕 Thread 的几个常见属性 📕 start()-启动一个线程 📕 中断一个线程 🚩 实例一 🚩 实例二 🚩 实例三 📕 jo…

昇思MindSpore学习入门-数据处理管道支持python对象

数据处理管道中的特定操作(如自定义数据集GeneratorDataset、自定义map增强操作、自定义batch(per_batch_map...))支持任意Python类型对象作为输入。为了支持此特性,数据管道使用了Python(dict)字典去管理不同类型的对象。与其他类型相比&…

康康近期的慢SQL(oracle vs 达梦)

近期执行的sql,哪些比较慢? 或者健康检查时搂一眼状态 oracle: --最近3天内的慢sql set lines 200 pages 100 col txt for a65 col sql_id for a13 select a.sql_id,a.cnt,a.pctload,b.sql_text txt from (select * from (select sql_id,co…

基于微信小程序的自习室选座系统/基于Java的自习室选座系统/自习室管理系统的设计与实现

获取源码联系方式请查看文章结尾🍅 摘要 自习室选座是学校针对用户必不可少的一个部分。在学校的整个过程中,学生担负着最重要的角色。为满足如今日益复杂的管理需求,各类微信小程序自习室选座也在不断改进。本课题所设计的小程序自习室选座系…

【C#】Visual Studio2022打包依赖第三方库的winForm程序为exe

0.简介 IDE:VS2022 平台:C# .NetFramework4.7.2 WinForm界面 有GDAL、EEplus第三方库的依赖,所以在其他未安装环境的电脑中功能无法使用。 1. 安装 1.1 运行文件输出 在VS扩展中选择管理扩展,安装:Microsoft Visua…

SpringBoot上传超大文件导致OOM,完美问题解决办法

问题描述 报错: Caused by: java.lang.OutOfMemoryError at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) ~[?:1.8.0_381] at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) ~[?:1.8.0_381] at java.…

MQTTX连接华为云IoTDA

目录 华为IoTDA平台 MQTTX连接参数的设置 物模型的构建 属性上报 基本数据格式 时戳 我以前上课都是用巴法云服务器来演示MQTT的,前几天因为测试工业互联网关使用了华为的IoTDA,觉得也不算太复杂,今天尝试用MQTTX连接华为云&#xff0c…

【ARM】MDK-STM32g0xx.h文件与Define规则记录

【更多软件使用问题请点击亿道电子官方网站】 1、 文档目标 记录问题STM32g0xx.h等有关ST的可读文件,尽量勿修改文件格式及对其代码进行添加和删减,记录查找问题的过程中的疑惑,并如何给予客户正确的回复,帮助销售完成验收&…

CefSharp音视频编译与免费下载

注:Cefharp 音频和视频播放编译,生成相应的dll文件,从而支持项目开发。 建议编译至少 16G 的 RAM和至少 250G 的 SSD。该脚本以 E 盘为例,您需要在 E 盘上手动创建 cef 文件夹。禁止在转载后通过发布其他平台向用户收取下载费用。…

新形势下职业教育大数据人才培养策略

一、引言 随着信息技术的飞速发展,大数据已成为驱动经济社会变革的关键力量。在新形势下,职业教育作为技术技能人才培养的重要阵地,面临着如何适应大数据时代要求、提升人才培养质量的紧迫任务。当前,职业教育在大数据人才培养方…

云HIS系统源码,业务云协同和数据云协同的数字化医院信息系统

云HIS是利用云计算、大数据、物联网、移动互联网等技术,打造的一个安全、便捷、共享、协同的医疗互联网云生态平台。核心功能是业务云协同和数据云协同。云HIS具有可扩展、易共享、易协同、低成本、体验号、更便捷、易维护的优势,重新定义了数字化医院信…

leetcode日记(49)旋转链表

其实不难,就是根据kk%len判断需要旋转的位置,再将后半段接在前半段前面就行。 /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode() : val(0), next(nullptr) {}* ListNode(int x) : …

uniapp的h5,读取本地txt带标签的文件

效果图 使用的回显的标签是u-parse,下面的网址讲了这个标签的相关 https://www.cnblogs.com/huihuihero/p/12978903.html 导入此插件 https://ext.dcloud.net.cn/plugin?id364 使用 uni.request({// 本地文件url: "/static/互联网医院医师端用户协议.txt…

C# datetimePicker

1. 直接把控件拉到设计器中,此时不要调整控件的values属性,这样就可以 打开后每次默认显示当天日期。 2. 属性Format long长日期格式默认值short短日期格式Time时间格式custom自定义时间格式在customFormat这个属性设置,比如yyyy-MM-dd HH…

Stage模型应用程序包结构

目录 官网地址 官网结构图 开发态包结构 工程目录结构 配置文件 module.json5配置文件 app.json5配置文件 官网地址 官网地址 包结构 官网结构图 开发态包结构 在DevEco Studio上创建一个项目工程,并尝试创建多个不同类型的Module(类似一个一个的页…