RisingWave最佳实践-利用Dynamic filters 和 Temporal filters 实现监控告警

心得的体会

刚过了年刚开工,闲暇之余调研了分布式SQL流处理数据库–RisingWave,本人是Flink(包括FlinkSQL和Flink DataStream API)的资深用户,但接触到RisingWave令我眼前一亮,并且拿我们生产上的监控告警场景在RisingWave上做了验证,以下是自己的心得体会:

RisingWave架构简单,运维成本底,基于云原生(可以分别基于计算和存储动态伸缩),同时在开发上屏蔽了Flink等实时处理框架底层需要处理的一些技术细节(状态存储,数据一致性,分布式集群扩展等)。提供了与PostgreSQL兼容的标准SQL接口,用户可以像使用 PostgreSQL 一样处理数据流。并且RisingWave不单单可以处理流式数据,还提供了数据其他流式处理框架(如:Flink、storm)所不具备的数据存储能力,基本可以完全取代FlinkSQL。相对于其他OLAP系统(如:apache doris,starrocks),RisingWave采用同步实时,可以保证实时的新鲜度;强一致性,而不是最终一致性。用户需要做的仅仅是通过开发SQL就可以处理流数据,当然首先需要具备流式数据处理思维(相对于离线)。

RisingWave当然也有自身的不足,相对于Flink可以通过DataStream API自定义灵活的处理流式数据,RisingWave只能解决一些特定的流式场景,无法做太多定制开发;相对于Apache Doris等OLAP实时分析性数据库,RisingWave不适合做分析型随机查询。另外RisingWave是个新事物,正在发展阶段,周边生态和相关文档还不健全,作为尝鲜者可能会踩很多坑。然而令人欣慰的是RisingWave的社区回复还是很及时的,RisingWave官方投入了很多精力在做RisingWave的布道和答疑。

至于争论比较厉害的RisingWav VS FLink的性能和吞吐量上孰优孰劣,针对不同应用场景可能有不同表现,因此没有亲自调研就没有发言权。但我认为在不同的场景下他们应该有各自的优势。无论如何RisingWave部署简单,上手容易,试错成本低是一个不争的事实。RisingWave可以应用在一些数据看版,监控,实时指标等场景。

利用动态和时间过滤器实现监控告警

FlinkSQL解决不了定时触发的问题,FlinkSQL的流处理逻辑只是按event触发,不能按时间条件触发,也就是没有触发器机制。FlinkSQL窗口的定时触发,归根结底也是基于event触发,event驱动的机制。因此需要触发器的场景就需要用到Flink DataStream API的KeyedProcessFunction等算子。但RisingWave利用Dynamic filters 和 Temporal filters 可以间接实现类似场景的触发器机制。

场景描述

现有如下群消息实时指标监控场景:
数据有初始化(init)、查询(query)、回调(callback:succ+fail)三种先后顺序状态。
数据是按预设时间批次分组的,例如:2024-01-01 08:00:00、2024-01-01 08:30:00,实时统计每一个批次内三种不同状态的数据count。

监控指标一:在某一个批次延迟指定的时间(query_timeout)之内(例如:2024-01-01 08:00:00延迟1小时触发时间为系统时间2024-01-01 09:00:00),该批次的query状态数据count没有达到init状态的数量count阀值(即query_count<init_count*query_threshold)就触发告警。
同时结束该批次数据统计,下发该批次数据的指标包括:批次时间、init_count、query_count等

监控指标二:如果指标一告警没有被触发,该批次在满足query状态数据count达到init状态数量count的阀值(即query_count>=init_countquery_threshold)以后,在指定的延迟时间内(callback_timeout),该批次的callback状态数据count没有达到query状态的数量count阀值(即callback_count<query_countcallback_threshold)就触发告警。
同时结束该批次数据统计,下发该批次数据的指标包括:批次时间、init_count、query_count、callback_count等

群消息实时指标监控流程图如下:
群消息实时指标监控流程图

实例demo

RisingWave部署可以参考:RisingWave分布式SQL流处理数据库调研

假设:
query_threshold=1, callback_count=1
query_timeout= ‘5 minute’, callback_timeout= ‘1 minute’
0->init,1->query,2->callback

RisingWave SQL:

 
DROP TABLE t_msg;
CREATE TABLE t_msg(msg_id int,status smallint ,public_time timestamp,process_time timestamp as proctime()
) APPEND ONLY;set timezone = 'PRC';--PRC(People’s Republic of China)
show timezone;select * from t_msg;--统计不同状态的count
DROP MATERIALIZED VIEW mv_t_msg_groupby; 
CREATE MATERIALIZED VIEW mv_t_msg_groupby AS
SELECT public_time
,sum(case when status = 0  then 1 else 0 end) AS init_count
,sum(case when status = 1  then 1 else 0 end) AS query_count
,sum(case when status = 2  then 1 else 0 end) AS callback_count
,max(process_time) as process_time
FROM t_msg 
group by public_time;select * from mv_t_msg_groupby;--sink_query_alarm 
DROP SINK sink_query_alarm;
CREATE SINK sink_query_alarm AS 
SELECT public_time
,init_count
,query_count
,process_time
FROM mv_t_msg_groupby
where public_time + INTERVAL '5 minute' <= now()   --query_timeout=1 minute, public_time相当于当前时间延迟1分钟触发,【Delay table changes】
and   init_count*1 > query_count --query_threshold=1
WITH (connector='kafka',properties.bootstrap.server='192.168.1.100:8092',topic='t_sink_query_alarm'
)
FORMAT PLAIN ENCODE JSON(force_append_only='true'
);--由于RisingWave不支持在【MATERIALIZED VIEW】和【SINK】等【可伸缩流】中指定处理时间字段,因此需要借助外部存储kafka周转
--RisngWave官方给的解释:support a proctime on an append only stream might be easier but on retractable stream could take extra cost. We must think it carefully to introduce such a feature.--sink_query_succ 
DROP SINK sink_query_succ;
CREATE SINK sink_query_succ AS 
SELECT public_time
,init_count
,query_count
,callback_count
,process_time as query_succ_process_time
FROM mv_t_msg_groupby
where public_time + INTERVAL '5 minute' >= now()   --query_timeout=1 minute, 在指定的时间内,【Delete and clean expired data】
and   init_count*1 <= query_count --query_threshold=1,query_count达到了指定值
WITH (connector='kafka',properties.bootstrap.server='192.168.1.100:8092',topic='t_sink_query_succ'
)
FORMAT PLAIN ENCODE JSON(force_append_only='true'
);--source连接器
DROP SOURCE source_query_succ;
CREATE SOURCE IF NOT EXISTS source_query_succ (init_count int,query_count int ,callback_count int ,public_time timestamp,query_succ_process_time timestamp
)
WITH (connector='kafka',topic='t_sink_query_succ',properties.bootstrap.server='192.168.1.100:8092',scan.startup.mode='earliest', -- earliest ,latest,default:earliest 
) FORMAT PLAIN ENCODE JSON;select * from source_query_succ;--sink_callback_alarm,用到动态过滤器和时间过滤器
DROP SINK sink_callback_alarm;
CREATE SINK sink_callback_alarm AS 
WITH tmp AS ( 
select public_time, min(query_succ_process_time) as query_succ_process_time  -- 动态过滤器
FROM source_query_succ 
group by public_time
)
SELECT b.public_time
,b.init_count
,b.query_count
,b.callback_count
,b.process_time
,a.query_succ_process_time
FROM  tmp a
JOIN  mv_t_msg_groupby b ON a.public_time=b.public_time
where a.query_succ_process_time + INTERVAL '1 minute' <= now()   --query_timeout=1 minute, public_time相当于当前时间延迟1分钟触发,【Delay table changes】
and   b.query_count*1 > b.callback_count --callback_threshold=1
WITH (connector='kafka',properties.bootstrap.server='192.168.1.100:8092',topic='t_sink_callback_alarm'
)
FORMAT PLAIN ENCODE JSON(force_append_only='true'
);-- 模拟数据
--init
INSERT INTO t_msg values(1,0,'2024-02-23 15:55:00'::TIMESTAMP); --比当前系统时间早
INSERT INTO t_msg values(2,0,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(3,0,'2024-02-23 15:55:00'::TIMESTAMP);
--query                                
INSERT INTO t_msg values(1,1,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(2,1,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(3,1,'2024-02-23 15:55:00'::TIMESTAMP);
--callback                             
INSERT INTO t_msg values(1,2,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(2,2,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(3,2,'2024-02-23 15:55:00'::TIMESTAMP);

查看监控结果:

docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t t_sink_query_alarm -C 
docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t t_sink_callback_alarm -C 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/698436.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

sql server想要小数点后向下取整怎么搞

select FORMAT(3.169, N2) as 四舍五入1, CAST(3.169 AS decimal(9,2)) as 四舍五入2, ROUND(3.169, 2) as 四舍五入3, CAST(FLOOR(3.169 * 100) / 100 AS decimal(9,2)) as 向下取整1, FLOOR(3.169 * 100) / 100 as 向下取整2, ceiling(3.169 * 100) / 100 as 向上取整—…

AI 视频 | Stable Video 开放公测了,免部署,免费使用!谁说 4 秒的 AI 视频不香?!

谁说 4 秒的视频不香&#xff1f;2.21 日&#xff0c;Stable Video 开放公测了&#xff0c;不需要自己部署了&#xff0c;直接在网页上就可以生成视频了。 下面这些视频&#xff0c;都是通过 Stable Video Diffusion 生成的&#xff0c;可以先来感受一下&#xff1a; Stable V…

IPsec、安全关联、网络层安全协议

网络层安全协议 IP 几乎不具备任何安全性&#xff0c;不能保证&#xff1a; 1.数据机密性 2.数据完整性 3.数据来源认证 由于其在设计和实现上存在安全漏洞&#xff0c;使各种攻击有机可乘。例如&#xff1a;攻击者很容易构造一个包含虚假地址的 IP 数据报。 IPsec 提供了标…

nginx服务基础用法(概念、安装、热升级)

目录 一、I/O模型概述 1、I/O概念 1.1 计算机的I/O 1.2 Linux的I/O 2、零拷贝技术 3、同步/异步&#xff08;消息反馈机制&#xff09; 4、阻塞/非阻塞 5、网络I/O模型 5.1 阻塞型 I/O 模型&#xff08;blocking IO&#xff09; 5.2 非阻塞型 I/O 模型 (nonblocking …

外包干了两个月,技术退步明显。。。。。

先说一下自己的情况&#xff0c;本科生&#xff0c;19年通过校招进入广州某软件公司&#xff0c;干了接近4年的功能测试&#xff0c;今年年初&#xff0c;感觉自己不能够在这样下去了&#xff0c;长时间呆在一个舒适的环境会让一个人堕落!而我已经在一个企业干了四年的功能测试…

SD-WAN云专线:优越性能助力企业云上腾飞

随着企业数字化转型的推进&#xff0c;SD-WAN云专线作为一项安全、高速、低延迟、稳定可靠的专属连接通道&#xff0c;正迅速崭露头角。该技术通过连接用户办公点、数据中心以及各类云环境&#xff0c;构建强大的网络基础设施&#xff0c;助力企业轻松实现跨域云网数据互通&…

uniapp微信小程序解决上方刘海屏遮挡

问题 在有刘海屏的手机上&#xff0c;我们的文字和按钮等可能会被遮挡 应该避免这种情况 解决 const SYSTEM_INFO uni.getSystemInfoSync();export const getStatusBarHeight ()> SYSTEM_INFO.statusBarHeight || 15;export const getTitleBarHeight ()>{if(uni.get…

AI副业项目分享

在上一篇文章《这才是大学生该做的副业&#xff0c;别再痴迷于游戏了&#xff01;》中&#xff0c;我分享了一些副业的想法&#xff0c;接着有不少同学问我&#xff1a;具体如何做&#xff1f;这是真把我给整蒙了&#xff0c;这里分享下我可以提供的产品和服务吧&#xff0c;并…

Vant轮播多个div结合二维数组的运用

需求说明 在开发H5的时候&#xff0c;结合Vant组件的轮播组件Swipe实现如下功能。我们查阅vant组件库官方文档可以得知&#xff0c;每个SwipeItem组件代表一个卡片&#xff0c;实现的是每屏展示单张图片或者单个div轮播方式&#xff0c;具体可以查阅&#xff1a;Vant 2 - 轻量、…

Mysql中like %xxx% 模糊查询该如何优化

背景&#xff1a; 实际项目中&#xff0c;like %xxx%的情况其实挺多的&#xff0c;比如某个表单如果支持根据公司名进行搜索&#xff0c;用户一般都是输入湖南xxx有限公司中的xxx进行搜索&#xff0c;所以对于接口而言&#xff0c;就必须使用like %xxx%来支持&#xff0c;从而…

C#,入门教程(05)——Visual Studio 2022源程序(源代码)自动排版的功能动画图示

上一篇&#xff1a; C#&#xff0c;入门教程(04)——Visual Studio 2022 数据编程实例&#xff1a;随机数与组合https://blog.csdn.net/beijinghorn/article/details/123533838 新来的徒弟们交上来的C#代码&#xff0c;可读性往往很差。 今天一问才知道&#xff0c;他们居然不…

MIT6.S081学习——二、相关命令行整理

MIT6.S081学习——二、相关命令行整理 1 添加user代码到xv6中并编译2 git版本管理 1 添加user代码到xv6中并编译 问题&#xff1a;如何让在xv6中运行copy.c 答&#xff1a;在xv6中运行copy.c文件&#xff0c;你需要先将该文件添加到xv6源代码目录中&#xff0c;然后修改Makefil…

笔试题讲解(C语言进阶)

目录 前言 1、题目 2、答案 3、解析 结语 前言 “纸上得来终觉浅&#xff0c;绝知此事要躬行”。本篇通过对指针实际案例的分析&#xff0c;由浅入深&#xff0c;来加强我们对指针的理解。 1、题目 这是一道难题&#xff0c;小心哦。 #include <stdio.h> int main(…

值类型:左值、纯右值、将亡值

值类型是一个古老的概念&#xff0c;早在C98就存在了&#xff0c;但在C11之前这些都无关紧要&#xff0c;随着C11右值引用的产生值类型也被赋予了新的含义。 但问题是C11并未给出清晰的定义&#xff0c;比如在C11的标准文档中&#xff0c;左值的概念只有一句话&#xff1a;“指…

使用向量数据库pinecone构建应用02:检索增强生成RAG

Building Applications with Vector Databases 下面是这门课的学习笔记&#xff1a;https://www.deeplearning.ai/short-courses/building-applications-vector-databases/ Learn to create six exciting applications of vector databases and implement them using Pinecon…

Vue单文件学习项目综合案例Demo,黑马vue教程

文章目录 前言一、小黑记事本二、购物车三、小黑记账清单 前言 bilibili视频地址 一、小黑记事本 效果图 主代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"/><meta http-equiv"X-UA-Compatible&…

Open CASCADE学习|绘制砂轮

今天绘制一个砂轮&#xff0c;其轮廓由两条直线段和两段圆弧构成&#xff0c;圆弧分别与直线相切&#xff0c;两条圆弧之间相交而非相切。建模思路是&#xff1a;先给定两条直线段的起始点及长度&#xff0c;画出直线段&#xff0c;然后给定其中一圆弧的半径及圆心角&#xff0…

4核8G服务器能承受多少并发?

腾讯云4核8G服务器能承受多少并发&#xff1f;阿腾云的4核8G服务器可以支持20个访客同时访问&#xff0c;关于4核8G服务器承载量并发数qps计算测评&#xff0c;云服务器上运行程序效率不同支持人数在线人数不同&#xff0c;公网带宽也是影响4核8G服务器并发数的一大因素&#x…

Nginx网络服务

一、Nginx概述 1.1Nginx介绍 Nginx&#xff1a; 一款高新能、轻量级Web服务软件稳定性高系统资源消耗低对HTTP并发连接的处理能力高单台物理服务器可支持30 000&#xff5e;50 000个并发请求。 Nginx 是开源、高性能、高可靠的 Web 和反向代理服务器&#xff0c;而且支持热部…

【LeetCode: 889. 根据前序和后序遍历构造二叉树 + DFS】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…