Flink SQL 实时读取 kafka 数据写入 Clickhouse —— 日志处理(三)

文章目录

  • 前言
  • Clickhouse 表设计
    • adlp_log_local 本地表
    • adlp_log 分布式表
  • Flink SQL 说明
    • 创建 Source Table (Kafka) 连接器表
    • 创建 Sink Table (Clickhouse) 连接器
    • 解析 Message 写入 Sink
  • 日志查询演示
  • 总结

前言

在之前的文章中,我们总结了如何在 Django 项目中进行日志配置,以及如何在 k8s 上部署 Filebeat 采集 PVC 中的日志发送至 Kafka:

  • Django 日志控制台输出、文件写入按天拆分文件,自定义 Filter 增加 trace_id 以及过滤——日志处理(一)
  • Filebeat k8s 部署(Deployment)采集 PVC 日志发送至 Kafka——日志处理(二)

本文将总结如何使用 Flink SQL 实时将 kafka 中的日志消息发送至 Clickhouse 表中。

说明
限于文章主题和篇幅,本文不会将如何部署和使用 Flink SQL, 关于这些内容过多而且网上资料也很多,就不再赘述。
本文的核心是说明如何设计 Clickhouse 表结构,以及对应的 Flink SQL 说明。

Clickhouse 表设计


上图中的JSON 内容是kafka 中的日志消息,我们需要读取该消息中的 message 字段(我们的日志信息),然后将该字段中的 time, level, func, trace_id, message 保存至 clickhouse 中。
这里我使用两张表保存日志:

  • adlp_log_local本地表
  • adlp_log分布式表,FlinkSQL 实时写入分布式表

adlp_log_local 本地表

create table if not exists cloud_data.adlp_log_local on cluster perftest_5shards_2replicas
(`dt`             DateTime64(3),`level`          LowCardinality(String),`trace_id`       String,`func`           String,`message`        String,-- 建立索引加速低命中率内容的查询INDEX idx_trace_id `trace_id` TYPE tokenbf_v1(4096, 2, 0) GRANULARITY 2,INDEX idx_message `message` TYPE tokenbf_v1(30720, 2, 0) GRANULARITY 1
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/cloud_data/adlp_log_local', '{replica}')PARTITION BY toYYYYMMDD(dt)PRIMARY KEY (dt, trace_id)ORDER BY (dt, trace_id)TTL toDateTime(dt) + toIntervalDay(30);

字段说明

  • dt (DateTime64(3)): 存储日志时间戳,精确到毫秒。
  • level (LowCardinality(String)): 存储日志级别,如 INFOERROR 等,使用 LowCardinality 优化存储和查询。
  • trace_id (String): 存储追踪 ID,通常用于关联一系列相关的日志记录。
  • func (String): 存储函数或方法名称,表示日志产生的位置。
  • message(String): 存储日志消息的具体内容。

索引

  • idx_trace_id: 使用 tokenbf_v1 类型的布隆过滤器索引(tokenbf_v1(4096, 2, 0)),在 trace_id 字段上创建,粒度为 2。布隆过滤器索引适合低命中率的查询,能够快速过滤出大多数不匹配的记录。
  • idx_message: 使用 tokenbf_v1 类型的布隆过滤器索引(tokenbf_v1(30720, 2, 0)),在 message 字段上创建,粒度为 1。同样用于加速低命中率的查询。

存储引擎

  • ReplicatedMergeTree: 使用分布式和复制的存储引擎,路径模板为 /clickhouse/tables/{layer}-{shard}/cloud_data/adlp_log_local,副本名称为 {replica},保证数据的高可用性和一致性。

分区和排序

  • 分区 (PARTITION BY): 按 dt 字段的年月日(toYYYYMMDD(dt))进行分区,有助于管理和查询按天划分的数据。
  • 主键 (PRIMARY KEY): 主键由 dttrace_id 组成,有助于高效查询。
  • 排序 (ORDER BY): 按 dttrace_id 字段排序,优化基于时间和 trace ID 的查询。

数据生命周期 (TTL)

  • TTL (Time To Live): 配置数据的生存时间,数据在 dt 字段的时间加上 30 天后自动过期删除,保持数据表的清洁和高效。

adlp_log 分布式表

create table if not exists cloud_data.adlp_log on cluster perftest_5shards_2replicas
(`dt`             DateTime64(3),`level`          LowCardinality(String),`trace_id`       String,`func`           String,`message`        String
)
ENGINE = Distributed('perftest_5shards_2replicas', 'cloud_data', 'adlp_log_local', rand());

字段说明
与本地表 adlp_log_local 相同,包含以下字段:

  • dt (DateTime64(3))
  • level (LowCardinality(String))
  • trace_id (String)
  • func (String)
  • message (String)

存储引擎
Distributed: 分布式引擎,允许将数据分布到多个分片和副本中。参数解释如下:

  • 集群名称 (perftest_5shards_2replicas): 指定集群的名称。
  • 数据库 (cloud_data): 数据库名称。
  • 表 (adlp_log_local): 本地表的名称。
  • 分片键 (rand()): 使用随机函数进行数据分片,保证数据均匀分布。

Flink SQL 说明

创建 Source Table (Kafka) 连接器表

CREATE TEMPORARY TABLE source_table (message STRING
) WITH ('connector' = 'kafka','topic' = 'filebeat_logs','properties.bootstrap.servers' = '127.0.0.1:9092','properties.group.id' = 'prod-logs-k2c','scan.startup.mode' = 'earliest-offset','format' = 'json','json.ignore-parse-errors' = 'false','json.fail-on-missing-field' = 'false','properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.mechanism' = 'PLAIN','properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";'
);

创建 Sink Table (Clickhouse) 连接器

CREATE TEMPORARY TABLE sink_table (`dt` TIMESTAMP(3),`level` STRING ,`trace_id` STRING ,`func` STRING ,`message` STRING
) WITH ('connector' = 'clickhouse','url' = 'clickhouse://127.0.0.1:8123','username' = 'admin','password' = 'admin','database-name' = 'cloud_data','table-name' = 'adlp_log','use-local' = 'true','sink.batch-size' = '1000','sink.flush-interval' = '1000','sink.max-retries' = '10','sink.update-strategy' = 'insert','sink.sharding.use-table-definition' = 'true','sink.parallelism' = '1'
);

解析 Message 写入 Sink

INSERT INTO sink_table
SELECT TO_TIMESTAMP(JSON_VALUE(message, '$.time'), 'yyyy-MM-dd HH:mm:ss') AS dt,JSON_VALUE(message, '$.level') AS level,JSON_VALUE(message, '$.trace_id') AS trace_id,JSON_VALUE(message, '$.func') AS func,JSON_VALUE(message, '$.message') AS message
FROM source_table;

注意:
这里在写入的时候默认我们的日志格式是 JSON 的,如果我们的日志发送到 kafka 不是 JSON 格式的,上边的 JSON_VALUE 可能会报错。当然,我们也可以在条件中加上是否为 JSON 判断,但是我觉得没必要。

日志查询演示

我们的日志导入成功后,可以通过第三方查询工具查询 clickhouse 数据源,我这里使用的是 superset 去查询 clickhouse 数据源。
通过 trace_id 查询整个执行链路的日志
image.png
查询错误日志信息
image.png

全文检索 message 日志信息
image.png

更多扩展

  • superset 是一个强大的 BI 工具,可以将我们的日志中的一些指标做成看板,比如说关键错误日志数量,然后设置告警,发送通知。
  • 通过 Flink SQL 实时将我们的日志从 kafka 中写入 clickhouse ,结合 clickhouse 强大的查询功能,以及 superset 强大的 BI 功能,可以充分挖掘业务日志中的潜在价值。

总结

本文总结了如何使用使用 Clickhouse 保存日志数据,以及如何通过 Flink SQL 将我们的日志实时从 kafka 同步至 clickhouse,然后在结合强大的第三方查询 BI 工具 superset,玩转业务日志,挖掘业务日志的潜在价值。
本文设计到的技能知识点比较多,需要熟悉 Clickhouse, Kafka, FlinkSQL, Superset 等,我之前的文章中总结了一些关于 Clickhouse 和 Kafka 相关的内容,感兴趣的读者可以看看:

clickhouse

  • Clickhouse字典关联外部 MySQL 表联合查询实践
  • ClickHouse架构概览 —— Clickhouse 架构篇(一)
  • ClickHouse 使用技巧总结
  • ClickHouse 实现用户画像(标签)系统实践

kafka

  • kafka 生产者 API 实践总结
  • kafka 消费者 API 使用总结
  • 保证 Kafka 数据可靠性最佳实践总结
  • kafka 实现精确一次性语义实践总结

superset

  • Pycharm 调试 superset 源码配置(远程调试)
  • superset 二开增加 flink 数据源连接通过flink sql 查询数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/49869.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

构建智慧水利系统,优化水资源管理:结合物联网、云计算等先进技术,打造全方位、高效的水利管理系统,实现水资源的最大化利用

本文关键词:智慧水利、智慧水利工程、智慧水利发展前景、智慧水利技术、智慧水利信息化系统、智慧水利解决方案、数字水利和智慧水利、数字水利工程、数字水利建设、数字水利概念、人水和协、智慧水库、智慧水库管理平台、智慧水库建设方案、智慧水库解决方案、智慧…

spring-boot3.x整合Swagger 3 (OpenAPI 3) +knife4j

1.简介 OpenAPI阶段的Swagger也被称为Swagger 3.0。在Swagger 2.0后,Swagger规范正式更名为OpenAPI规范,并且根据OpenAPI规范的版本号进行了更新。因此,Swagger 3.0对应的就是OpenAPI 3.0版本,它是Swagger在OpenAPI阶段推出的一个…

Unity判断鼠标是否在UI上

Unity判断鼠标是否在UI上 下值等于true表示在UI上 EventSystem.current.IsPointerOverGameObject()可用来判断滚轮滑动缩放视角功能,在UI上滑动滚轮视角不缩放,反之缩放。

Python开发日常总结

1、命令总结 1.1 conda创建、激活、退出虚拟环境 conda create --name myenv python3.8 # 创建 conda create --name myenv python3.9 # 激活 conda activate myenv # 退出 conda deactivate

产品系统的UI暗色系和浅色系模式切换是符合人体视觉工程学的设计

视觉革命:UI设计中的暗夜与黎明 UI设计如同夜空中最亮的星辰,引领着用户穿梭于信息的海洋。而今,一场视觉革命正在悄然上演,它关乎于我们的眼睛,关乎于我们的体验——那就是产品系统的UI暗色系和浅色系模式的切换。如…

手写一个JVM自定义类加载器

1. 自定义类加载器的意义 隔离加载类:在某些框架内进行中间件与应用的模块隔离,把类加载到不同的环境。比如:阿里内某容器框架通过自定义类加载器确保应用中依赖的jar包不会影响到中间件运行时使用的jar包。再比如:Tomcat这类Web…

Android lmkd机制详解

目录 一、lmkd介绍 二、lmkd实现原理 2.1 工作原理图 2.2 初始化 2.3 oom_adj获取 2.4 监听psi事件及处理 2.5 进程选取与查杀 2.5.1 进程选取 2.5.2 进程查杀 三、关键系统属性 四、核心数据结构 五、代码时序 一、lmkd介绍 Android lmkd采用epoll方式监听linux内…

SpringBoot整合阿里云短信业务

详细介绍SpringBoot整合阿里云短信服务的每一步过程,同时会将验证码存放到Redis中并设置过期时间,尽量保证实战的同时也让没做过的好兄弟也能实现发短信的功能~ 1. 注册阿里云账号和创建Access Key 首先,你需要注册一个阿里云账号&#xff0…

Flutter 使用 url_launcher的canLaunchUrl() 方法总是返回false错误

Flutter 使用 url_launcher的canLaunchUrl() 方法总是返回false错误 众所周知,我们一般使用url_launcher来打开各种应用,网页,手机应用等.... 但是最近发现Flutter的canLaunchUrl()方法总是返回false,这是为什么呢? …

Qt 实战(3)数据类型 | 3.2、QVariant

文章目录 一、QVariant1、存储数据1.1、存储Qt内置数据1.2、存储自定义数据 2、获取数据3、判断数据类型4、清空数据5、总结 前言: QVariant是Qt框架中一个非常强大且灵活的类,它提供了一种通用的方式来存储和转换几乎任何类型的数据。无论是基本数据类型…

【JavaEE初阶】Thread类及常见方法

目录 📕 Thread类的概念 📕 Thread 的常见构造方法 📕 Thread 的几个常见属性 📕 start()-启动一个线程 📕 中断一个线程 🚩 实例一 🚩 实例二 🚩 实例三 📕 jo…

Android中的usescleartexttraffic属性详解

Android中的usescleartexttraffic属性详解 usesCleartextTraffic 是 Android 应用程序开发中的一个重要配置选项,用于控制应用程序是否允许通过不加密的 HTTP 协议进行网络通信。在 Android 应用的开发过程中,正确地配置 usesCleartextTraffic 对于保护用…

昇思MindSpore学习入门-数据处理管道支持python对象

数据处理管道中的特定操作(如自定义数据集GeneratorDataset、自定义map增强操作、自定义batch(per_batch_map...))支持任意Python类型对象作为输入。为了支持此特性,数据管道使用了Python(dict)字典去管理不同类型的对象。与其他类型相比&…

康康近期的慢SQL(oracle vs 达梦)

近期执行的sql,哪些比较慢? 或者健康检查时搂一眼状态 oracle: --最近3天内的慢sql set lines 200 pages 100 col txt for a65 col sql_id for a13 select a.sql_id,a.cnt,a.pctload,b.sql_text txt from (select * from (select sql_id,co…

Nvm和Npm和Pm2的关系和使用说明

一、三者关系说明 nvm、npm 和 pm2 在 Node.js 生态系统中扮演着不同的角色,但它们之间存在一定的关联。下面是每个工具的作用以及它们之间的关系:1. nvm (Node Version Manager)• nvm 是一个用于管理多个 Node.js 版本的工具。它允许用户在不同的项目…

基于微信小程序的自习室选座系统/基于Java的自习室选座系统/自习室管理系统的设计与实现

获取源码联系方式请查看文章结尾🍅 摘要 自习室选座是学校针对用户必不可少的一个部分。在学校的整个过程中,学生担负着最重要的角色。为满足如今日益复杂的管理需求,各类微信小程序自习室选座也在不断改进。本课题所设计的小程序自习室选座系…

【C#】Visual Studio2022打包依赖第三方库的winForm程序为exe

0.简介 IDE:VS2022 平台:C# .NetFramework4.7.2 WinForm界面 有GDAL、EEplus第三方库的依赖,所以在其他未安装环境的电脑中功能无法使用。 1. 安装 1.1 运行文件输出 在VS扩展中选择管理扩展,安装:Microsoft Visua…

SpringBoot上传超大文件导致OOM,完美问题解决办法

问题描述 报错: Caused by: java.lang.OutOfMemoryError at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) ~[?:1.8.0_381] at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) ~[?:1.8.0_381] at java.…

Android 更换applicationId 后 微信没有回调

1、解决办法&#xff08;代码如下&#xff09;&#xff1a; 使用 <activity-alias>: 这是一个用于定义活动别名的元素。活动别名可以让您为已经定义的活动提供一个别名&#xff0c;从而可以通过别名启动原来的活动。 <activityandroid:name".wxapi.WXEntryActiv…

ES6 class 类

普通使用原型添加方法 function Animal(name) {this.name name; }Animal.prototype.speak function() {console.log(this.name makes a noise.); };function Dog(name) {Animal.call(this, name); }Dog.prototype Object.create(Animal.prototype); Dog.prototype.constr…