Flink之窗口聚合算子

1.窗口聚合算子

在Flink中窗口聚合算子主要分类两类

  • 滚动聚合算子(增量聚合)
  • 全窗口聚合算子(全量聚合)
1.1 滚动聚合算子

滚动聚合算子一次只处理一条数据,通过算子中的累加器对聚合结果进行更新,当窗口触发时再从累加器中取结果数据,一般使用算子如下:

  • aggregate
  • max
  • maxBy
  • min
  • minBy
  • reduce
  • sum

这里以aggregate算子作为示例

// ... 
// 每10s统计一次每个用户最近30s的行为条数
SingleOutputStreamOperator<Tuple2<String, Integer>> result = watermarked.keyBy(userEvent -> userEvent.getUId()).window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10))) // 参数1:窗口长度 参数2:滑动步长即计算频率.aggregate(new AggregateFunction<UserEvent2, Tuple2<String, Integer>, Tuple2<String, Integer>>() {// 这里给一个初始值@Overridepublic Tuple2<String, Integer> createAccumulator() {return Tuple2.of("", 0);}// 在累加器中统计每个用户行为条数(来一条更新一次)@Overridepublic Tuple2<String, Integer> add(UserEvent2 value, Tuple2<String, Integer> accumulator) {Tuple2<String, Integer> result = Tuple2.of(value.getUId() + "-" + value.getName(), accumulator.f1 + 1);return result;}// 将累加器中的更新结果给到getResult方法,输出@Overridepublic Tuple2<String, Integer> getResult(Tuple2<String, Integer> accumulator) {return accumulator;}// 这个方法在流式计算中可以不用实现,在上下游数据进行合并时需要用到,以spark为例,上有map和下游reduce的计算结果需要合并时需要实现这个方法@Overridepublic Tuple2<String, Integer> merge(Tuple2<String, Integer> a, Tuple2<String, Integer> b) {Tuple2<String, Integer> merged = Tuple2.of(a.f0, a.f1 + b.f1);return merged;}});
// ...

只展示部分代码,冗余代码已省略.
图解如下:
image-20231012101658054

1.2 全窗口聚合算子

全窗口聚合算子会将数据记录在状态容器中,当窗口触发时会将整个窗口中的数据交给聚合函数,根据具体逻辑将这些数据进行计算,常用算子如下:

  • apply
  • process

这里以apply算子为例

// ... 
// 每10s统计一次最近30s每个用户行为发生事件最大两条数据
SingleOutputStreamOperator<UserEvent2> userEventTimeTop2 = keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)))// 泛型1: 数据数据类型 泛型2: 输出数据类型 泛型3: key类型 泛型4: 窗口类型.apply(new WindowFunction<UserEvent2, UserEvent2, String, TimeWindow>() {/***@Param s 本次传入的key*@Param window 本次传入窗口的各种元信息*@Param input 本次输入的所有数据*@Param out 输出数据**/@Overridepublic void apply(String s, TimeWindow window, Iterable<UserEvent2> input, Collector<UserEvent2> out) throws Exception {// 创建集合接收迭代器中的数据ArrayList<UserEvent2> userEvent2List = new ArrayList<>();// 遍历迭代器,也就是输入数据for (UserEvent2 userEvent2 : input) {// 将数据添加到集合中userEvent2List.add(userEvent2);}// 将集合中的数据根据用户行为发生事件进行排序Collections.sort(userEvent2List, new Comparator<UserEvent2>() {@Overridepublic int compare(UserEvent2 o1, UserEvent2 o2) {// 倒序排序return Integer.parseInt(o2.getTime()) - Integer.parseInt(o1.getTime());}});// 将每个用户行为发生时间最大的两条数据输出for (int i = 0; i < Math.min(userEvent2List.size(), 2); i++) {out.collect(userEvent2List.get(i));}}});
// ...

只展示部分代码,冗余代码已省略.
图解如下:
image-20231012101658054

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/108106.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前端数据可视化之【Echarts介绍】

目录 &#x1f31f;前言&#x1f31f;丰富的可视化类型&#x1f31f;多种数据格式无需转换直接使用&#x1f31f;移动端优化&#x1f31f;多渲染方案&#xff0c;跨平台使用&#xff01;&#x1f31f;写在最后 &#x1f31f;前言 ECharts开源来自百度商业前端数据可视化团队&a…

力扣每日一题43:字符串相乘

题目描述&#xff1a; 给定两个以字符串形式表示的非负整数 num1 和 num2&#xff0c;返回 num1 和 num2 的乘积&#xff0c;它们的乘积也表示为字符串形式。 注意&#xff1a;不能使用任何内置的 BigInteger 库或直接将输入转换为整数。 示例 1: 输入: num1 "2"…

Python数据分析教程(非常详细)从零基础入门到精通,看完这一篇就够了

1、为什么选择Python进行数据分析? Python是一门动态的、面向对象的脚本语言&#xff0c;同时也是一门简约&#xff0c;通俗易懂的编程语言。Python入门简单&#xff0c;代码可读性强&#xff0c;一段好的Python代码&#xff0c;阅读起来像是在读一篇外语文章。Python这种特性…

ROS键盘遥控机器人,通过参数服务器指定速度

1、引言 在上节的驱动机器人&#xff0c;我们知道是cmd_vel话题发布一串Twist类型消息来控制&#xff0c;我们可以输入如下命令查看这个Twist的详细信息&#xff1a;rosmsg show geometry_msgs/Twist geometry_msgs/Vector3 linear float64 x float64 y float64 z geome…

macos 中ios系统升级,但是macos还是老系统,在手机上无法安装ios软件

https://github.com/filsv/iOSDeviceSupport 构建项目出现 解决的方法&#xff1a; 就可以了&#xff0c;

JDBC-day05(DAO及相关实现类)

七&#xff1a;DAO及相关实现类 1. DAO介绍 DAO&#xff1a;Data Access Object访问数据信息的类和接口&#xff0c;包括了对数据的CRUD&#xff08;Create、Retrival、Update、Delete&#xff09;&#xff0c;而不包含任何业务相关的信息。有时也称作&#xff1a;BaseDAO作用…

关于SparkRdd和SparkSql的几个指标统计,scala语言,打包上传到spark集群,yarn模式运行

需求&#xff1a; ❖ 要求:分别用SparkRDD, SparkSQL两种编程方式完成下列数据分析,结合webUI监控比较性能优劣并给出结果的合理化解释. 1、分别统计用户&#xff0c;性别&#xff0c;职业的个数&#xff1a; 2、查看统计年龄分布情况&#xff08;按照年龄分段为7段&#xff0…

Adobe发布Firefly 2,提升图像质量和用户体验

&#x1f989; AI新闻 &#x1f680; Adobe发布Firefly 2&#xff0c;提升图像质量和用户体验 摘要&#xff1a;Adobe升级了其AIGC生图平台Firefly为Firefly 2&#xff0c;该版本通过引入矢量图生成功能、提升图像质量和增加多项新功能&#xff0c;大幅改善了用户体验。Firef…

太强了!三种方案优化 2000w 数据大表!

目录 评估表数据体量 表容量&#xff1a; 磁盘空间 实例容量 出现问题的原因 如何解决单表数据量太大&#xff0c;查询变慢的问题 方案一&#xff1a;数据表分区 方案二&#xff1a;数据库分表 水平分表 垂直分表 1.取模方案&#xff1a; 2.range 范围方案 3.hash…

C51--基本认知

单片机基本认知&#xff1a; 1、什么是单片机 单片机是一种集成电路芯片。 把具有数据处理能力的中央处理器 CPU、随机存储器RAM、只读存储器ROM。 多种 I / O 口和中断系统、定时器/计数器等功能&#xff08;可能还包括显示驱动电路、脉宽调制电路、模拟多路转换器、A/D转换器…

宁夏企业过等保选哪家测评机构好?选哪家堡垒机好?

最近不少宁夏小伙伴在问&#xff0c;宁夏企业过等保选哪家测评机构好&#xff1f;选哪家堡垒机好&#xff1f;今天我们小编就给大家来简单说说哈&#xff01; 宁夏企业过等保选哪家测评机构好&#xff1f; 目前宁夏正规具有资质的等保测评机构只有3家&#xff0c;分别为中电信…

精美的早安问候语,暖心祝福,开心每一天

1、 美好的祝福&#xff0c;成了清晨的主题。相互问候&#xff0c;是一天的开始。让我们伴着不老的岁月&#xff0c;永远开心快乐。早晨好&#xff01; 2、 心宽似海&#xff0c;百福皆来&#xff0c;世事看淡&#xff0c;内心安然。随缘即福&#xff0c;随遇而安&#xff0…

银河麒麟你服务x86访问ftp服务器上的文件

打开我的电脑 地址栏输入 ftp地址 可以选择需要的文件复制出来了

简单聊聊低代码

在数字经济迅速发展的背景下&#xff0c;越来越多的企业开始建立健全业务系统、应用、借助数字化工具提升管理效率&#xff0c;驱动业务发展&#xff0c;促进业绩增长。在这一过程中&#xff0c;和许多新技术一样&#xff0c;低代码&#xff08;Low-code&#xff09;开发被推上…

ssh 报错:Permission denied, please try again.

报错问题&#xff1a;执行一条远程scp远程拷贝&#xff0c;在此之前已配置好ssh无密登录&#xff0c; sudo scp -r hadoop-3.2.0 slave2:/usr/local/src/ 确保 /etc/ssh/sshd_config文件下 PasswordAuthentication no 改为 PasswordAuthentication yes 和 PermitRootLogin no …

【Overload游戏引擎细节分析】视图投影矩阵计算与摄像机

本文只罗列公式&#xff0c;不做具体的推导。 OpenGL本身没有摄像机(Camera)的概念&#xff0c;但我们为了产品上的需求与编程上的方便&#xff0c;一般会抽象一个摄像机组件。摄像机类似于人眼&#xff0c;可以建立一个本地坐标系。相机的位置是坐标原点&#xff0c;摄像机的朝…

探秘网页打开的完整过程:DNS解析、CDN加速和Nginx负载均衡的协同驱动

浅谈一个网页打开的全过程&#xff08;涉及DNS、CDN、Nginx负载均衡等&#xff09; 1、概要 从用户在浏览器输入域名开始&#xff0c;到web页面加载完毕&#xff0c;这是一个说复杂不复杂&#xff0c;说简单不简单的过程&#xff0c;下文暂且把这个过程称作网页加载过程。下面…

STM32CUBEMX_DMA串口空闲中断接收+接收发送缓冲区

STM32CUBEMX_DMA串口空闲中断接收接收发送缓冲区 前言&#xff1a; 我了解的串口接收指令的方式有&#xff1a;在这里插入图片描述 1、接收数据中断特定帧尾 2、接收数据中断空闲中断 3、DMA接收空闲中断 我最推荐第三种&#xff0c;尤其是数据量比较大且频繁的时候 串口配置 …

智慧党建小程序源码系统+在线答题考试二合一 带完整的搭建教程

大家好&#xff0c;今天来给大家分享一个智慧党建小程序源码系统。以下是部分核心代码图&#xff1a; 系统特色功能一览&#xff1a; 积分体系&#xff1a;党员可以通过完成各种党建活动&#xff0c;如学习党的理论知识、参加组织生活、开展志愿服务等获得积分&#xff0c;积分…

全局事件总线

全局事件总线 功能&#xff1a;可以解决所有组件之间通信传数据的问题原理&#xff1a;通过一个共享对象&#xff0c;将所有组件全部绑定到对象上&#xff0c;即可通过这个对象实现组件与组件之间的传递数据&#xff0c;而这个共享对象叫做全局事件总线。 如何分清楚谁是发送方…