AggregateFunction结合自定义触发器实现点击率计算

背景:

接上一篇文章,ProcessWindowFunction 结合自定义触发器会有状态过大的问题,本文就使用AggregateFunction结合自定义触发器来实现,这样就不会导致状态过大的问题了

AggregateFunction结合自定义触发器实现

在这里插入图片描述
flink对于每个窗口只需要维护一个状态:不像ProcessWindowFunction那样需要把窗口内收到的所有消息都作为状态存储起来

在这里插入图片描述
完整代码参见:

package wikiedits.func;import java.text.SimpleDateFormat;
import java.util.Date;import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;public class AggregateFunctionAndTiggerDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 使用处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);env.setStateBackend(new FsStateBackend("file:///D:/tmp/flink/checkpoint/aggregatetrigger"));// 并行度为1env.setParallelism(1);// 设置数据源,一共三个元素DataStream<Tuple2<String, Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {@Overridepublic void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {int xxxNum = 0;int yyyNum = 0;for (int i = 1; i < Integer.MAX_VALUE; i++) {// 只有XXX和YYY两种nameString name = (0 == i % 2) ? "XXX" : "YYY";// 更新aaa和bbb元素的总数if (0 == i % 2) {xxxNum++;} else {yyyNum++;}// 使用当前时间作为时间戳long timeStamp = System.currentTimeMillis();// 将数据和时间戳打印出来,用来验证数据if (xxxNum % 2000 == 0) {System.out.println(String.format("source,%s, %s,    XXX total : %d,    YYY total : %d\n", name,time(timeStamp), xxxNum, yyyNum));}// 发射一个元素,并且戴上了时间戳ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, 1), timeStamp);// 每发射一次就延时1秒Thread.sleep(1);}}@Overridepublic void cancel() {}});// 将数据用5秒的滚动窗口做划分,再用ProcessWindowFunctionSingleOutputStreamOperator<Tuple2<String, Integer>> mainDataStream = dataStream// 以Tuple2的f0字段作为key,本例中实际上key只有aaa和bbb两种.keyBy(value -> value.f0)// 5秒一次的滚动窗口.timeWindow(Time.minutes(5))// 10s触发一次计算,更新统计结果.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))// 统计每个key当前窗口内的元素数量,然后把key、数量、窗口起止时间整理成字符串发送给下游算子.aggregate(new AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() {// 1、初始值// 定义累加器初始值@Overridepublic Tuple2<String, Integer> createAccumulator() {return new Tuple2<String, Integer>("", 0);}// 2、累加// 定义累加器如何基于输入数据进行累加@Overridepublic Tuple2<String, Integer> add(Tuple2<String, Integer> value,Tuple2<String, Integer> accumulator) {accumulator.f0 = value.f0;accumulator.f1 += value.f1;return accumulator;}// 3、合并// 定义累加器如何和State中的累加器进行合并@Overridepublic Tuple2<String, Integer> merge(Tuple2<String, Integer> acc1,Tuple2<String, Integer> acc2) {acc1.f1 += acc2.f1;return acc1;}// 4、输出// 定义如何输出数据@Overridepublic Tuple2<String, Integer> getResult(Tuple2<String, Integer> accumulator) {return accumulator;}});// 打印结果,通过分析打印信息,检查ProcessWindowFunction中可以处理所有key的整个窗口的数据mainDataStream.print();env.execute("processfunction demo : processwindowfunction");}public static String time(long timeStamp) {return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));}}

通过这种方式我们就可以做到统计某个页面一天内至今为止的点击率,每10s输出一次点击率的结果,并且不会引起状态膨胀的问题

参考文献:
https://www.cnblogs.com/Springmoon-venn/p/13667023.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/69960.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

小白开始学习C++

​​​​第一节&#xff1a;控制台输出hello word&#xff01; #include<iostream> //引入库文件 int main() { //控制台输出 hello word! 之后回车 std::cout << "hello word!\n"; #include<iostream> //引入库文件int main() {//控制…

Python3 循环语句

Python3 循环语句 本章节将为大家介绍 Python 循环语句的使用。 Python 中的循环语句有 for 和 while。 Python 循环语句的控制结构图如下所示&#xff1a; while 循环 Python 中 while 语句的一般形式&#xff1a; while 判断条件(condition)&#xff1a;执行语句(statem…

【LeetCode算法系列题解】第61~65题

CONTENTS LeetCode 61. 旋转链表&#xff08;中等&#xff09;LeetCode 62. 不同路径&#xff08;中等&#xff09;LeetCode 63. 不同路径 II&#xff08;中等&#xff09;LeetCode 64. 最小路径和&#xff08;中等&#xff09;LeetCode 65. 有效数字&#xff08;困难&#xff…

py脚本解决ArcGIS Server服务内存过大的问题

在一台服务器上&#xff0c;使用ArcGIS Server发布地图服务&#xff0c;但是地图服务较多&#xff0c;在发布之后&#xff0c;服务器的内存持续处在95%上下的高位状态&#xff0c;导致服务器运行状态不稳定&#xff0c;经常需要重新启动。重新启动后重新进入这种内存高位的陷阱…

回复:c#的Winform如何让ComboBox不显示下拉框?https://bbs.csdn.net/topics/392565412

组合框.Parent this;组合框.Items.AddRange(new object[] { "111", "222", "333", "444" });组合框.DropDownHeight 1;组合框.SelectedIndex 0;//组合框.DropDownStyle ComboBoxStyle.Simple; ComboBox 组合框 new ComboBox();Li…

51单片机电子钟六位数码管显示整点提醒仿真设计( proteus仿真+程序+原理图+报告+讲解视频)

51单片机电子钟六位数码管显示整点提醒仿真设计( proteus仿真程序原理图报告讲解视频&#xff09; 1.主要功能&#xff1a;2.仿真3. 程序代码4. 原理图参考元器件清单 5. 设计报告6. 设计资料内容清单 51单片机电子钟六位数码管显示整点提醒仿真设计( proteus仿真程序原理图报告…

AOP进阶-连接点

连接点 在Spring中用JoinPoint抽象了连接点&#xff0c;用它可以获取方法执行时的相关信息&#xff0c;如目标类名、方法名、方法参数等 对于Around通知&#xff0c;获取连接点信息只能使用 ProceedingJoinPoint对于其它四种通知&#xff0c;获取连接点信息只能使用JoinPoint&…

Go语言高级编程:深度挖掘

Go语言高级编程&#xff1a;深度挖掘 欢迎继续深入Go语言的高级编程领域。在这篇博客中&#xff0c;我们将更深入地探讨Go语言的一些高级主题和技术&#xff0c;包括性能优化、错误处理、反射和自定义数据结构。 性能优化 Go语言因其出色的性能而广受欢迎&#xff0c;但要达…

c++中继承多态virtual和override

目录 virtual&#xff1a; 易错点&#xff1a; 未声明虚函数&#xff1a; 忘记使用 override 关键字&#xff1a; 内存泄漏&#xff1a; 基类指针不指向任何对象&#xff1a; 访问权限问题&#xff1a; 不正确的类设计&#xff1a; 不正确的对象切片&#xff1a; 混淆…

C高级-Linux终端基础指令

在线下载软件 检测网络 ping baidu.com在下载软件前&#xff0c;需将Linux系统中的软件源更新成国内的软件源&#xff1a;清华源、阿里源、163源、中科大源… 更新软件列表 将系统中的软件源更新为国内的软件源后&#xff0c;使用命令sudo apt-get update 使Ubuntu连接到国…

[HDCTF 2023]YamiYami

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言涉及知识点解题详细过程session伪造反弹shell 前言 从暑假末尾一直搁置&#xff0c;当时卡在反弹shell搞得离flag就差一步。不过最近一两天学习完反弹shell的知…

8.(Python数模)(预测模型一)马尔科夫链预测

Python实现马尔科夫链预测 马尔科夫链原理 马尔科夫链是一种进行预测的方法&#xff0c;常用于系统未来时刻情况只和现在有关&#xff0c;而与过去无关。 用下面这个例子来讲述马尔科夫链。 如何预测下一时刻计算机发生故障的概率&#xff1f; 当前状态只存在0&#xff08;故…

肖sir__设计测试用例方法之_(白盒测试)

白盒测试技术 一、定义&#xff1a; 白盒测试也叫透明盒测试&#xff0c;检查程序内部结构及路径一是否符合规格说明&#xff0c;二是否符合其代码规范。 因此&#xff0c;也叫结构测试或者逻辑驱动测试。 二、白盒测试常见方法&#xff1a; a、语句覆盖&#xff1b; b、判断覆…

虚拟机扩容

系统环境centos8&#xff0c;分两步&#xff0c;第一步先在vmware扩容&#xff0c;第二部在虚拟机内部扩容 1.vmware分配磁盘空间 2.虚拟机内部扩容 查看当前磁盘信息&#xff0c;这个是扩容之前的&#xff0c;扩容完成才会显示新的 df -h查看系统分区信息 fdisk -l查看目录…

C语言基础知识理论版(很详细)

文章目录 前述一、数据1.1 数据类型1.2 数据第一种数据&#xff1a;常量第二种数据&#xff1a;变量第三种数据&#xff1a;表达式1、算术运算符及算术表达式2、赋值运算符及赋值表达式3、自增、自减运算符4、逗号运算符及其表达式&#xff08;‘顺序求值’表达式&#xff09;5…

Spring Boot日志基础使用 设置日志级别

然后 我们来说日志 日志在实际开发中还是非常重要的 即可记录项目状态和一些特殊情况发生 因为 我们这里不是将项目 所以 讲的也不会特别深 基本还是将Spring Boot的日志设置或控制这一类的东西 相对业务的领域我们就不涉及了 日志 log 初期最明显的作用在于 开发中 你可以用…

深入浅出了解BeanFactory 和 ApplicationContext

一.区别 BeanFactory和ApplicationContext是Spring的两大核心接口&#xff0c;都可以当做Spring的容器。其中ApplicationContext是BeanFactory的子接口。 1.依赖关系 BeanFactory&#xff1a;是Spring里面最底层的接口&#xff0c;包含了各种Bean的定义&#xff0c;读取bean…

Mac 手动安装 sshpass

1. 下载安装包 https://sourceforge.net/projects/sshpass/ 解压并进入到安装包目录 tar -zxvf sshpass-xx.xx.tar.gz cd sshpass-xx.xx2. 检验环境&#xff0c;编译源码安装 ./configuremake&&make install3. 检测安装是否成功 ▶ sshpass Usage: sshpass [-f|-…

uniapp 使用mqtt 报错 socketTask onOpen is not a function

1. 报错的解决方法 在man.js文件添加这个 // #ifndef MP // 处理 wx.connectSocket promisify 兼容问题&#xff0c;强制返回 SocketTask uni.connectSocket (function(connectSocket) {return function(options) {console.log(options)options.success options.success ||…

Golang专题精进

Golang专题精进 Golang单元测试Golang错误处理Golang正则表达式Golang反射Golang验证码Golang日期时间处理库CarbonGolang发送邮件库emailGolang log日志Golang log日志框架logrusGolang加密和解密应用Golang访问权限控制框架casbinGolang使用swagger生成api接口文档Golang jwt…