一文弄明白KeyedProcessFunction函数

引言

KeyedProcessFunction是Flink用于处理KeyedStream的数据集合,它比ProcessFunction拥有更多特性,例如状态处理和定时器功能等。接下来就一起来了解下这个函数吧

正文

了解一个函数怎么用最权威的地方就是 官方文档 以及注解,KeyedProcessFunction的注解如下

/*** A keyed function that processes elements of a stream.** <p>For every element in the input stream {@link #processElement(Object, Context, Collector)} is* invoked. This can produce zero or more elements as output. Implementations can also query the* time and set timers through the provided {@link Context}. For firing timers {@link #onTimer(long,* OnTimerContext, Collector)} will be invoked. This can again produce zero or more elements as* output and register further timers.** <p><b>NOTE:</b> Access to keyed state and timers (which are also scoped to a key) is only* available if the {@code KeyedProcessFunction} is applied on a {@code KeyedStream}.** <p><b>NOTE:</b> A {@code KeyedProcessFunction} is always a {@link* org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the {@link* org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and teardown* methods can be implemented. See {@link* org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}* and {@link org.apache.flink.api.common.functions.RichFunction#close()}.*/

上面简单来说就是以下四点

  1. Flink中输入流中的每一条数据都会触发KeyedProcessFunction类的processElement方法调用
  2. 通过这个方法的Context参数可以设置定时器,在开启定时器后会程序会定时调用onTimer方法
  3. 由于KeyedProcessFunction实现了RichFunction接口,因此是可以通过RuntimeContext上下文对象管理状态state的开启和释放
  4. 需要注意的是,只有在KeyedStream里才能够访问state和定时器,通俗点来说就是这个函数要用在keyBy这个函数的后面

processElement方法解析

  1. Flink会调用processElement方法处理输入流中的每一条数据
  2. KeyedProcessFunction.Context参数可以用来读取以及更新内部状态state
  3. 这个KeyedProcessFunction跟其他function一样通过参数中的Collector对象以回写的方式返回数据

onTimer方法解析:在启用TimerService服务时会定时触发此方法,一般会在processElement方法中开启TimerService服务

以上就是这个函数的基本知识,接下来就通过实战来熟悉下它的使用

实战简介

本次实战的目标是学习KeyedProcessFunction,内容如下:

  1. 监听本机7777端口读取字符串
  2. 将每个字符串用空格分隔,转成Tuple2实例,f0是分隔后的单词,f1等于1
  3. 将Tuple2实例集合通过f0字段分区,得到KeyedStream
  4. KeyedSteam通过自定义KeyedProcessFunction处理
  5. 自定义KeyedProcessFunction的作用,是记录每个单词最新一次出现的时间,然后建一个十秒的定时器进行触发

使用代码例子

首先定义pojo类

public class CountWithTimestampNew {private String key;private long count;private long lastQuestTimestamp;public long getAndIncrementCount() {return ++count;}public String getKey() {return key;}public void setKey(String key) {this.key = key;}public long getCount() {return count;}public void setCount(long count) {this.count = count;}public long getLastQuestTimestamp() {return lastQuestTimestamp;}public void setLastQuestTimestamp(long lastQuestTimestamp) {this.lastQuestTimestamp = lastQuestTimestamp;}
}

接着实现KeyedProcessFunction类

public class CountWithTimeoutKeyProcessFunctionNew extends KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Long>> {private ValueState<CountWithTimestampNew> state;@Overridepublic void open(Configuration parameters) throws Exception {state = getRuntimeContext().getState(new ValueStateDescriptor<CountWithTimestampNew>("sherlock-state", CountWithTimestampNew.class));}// 实现数据处理逻辑的地方@Overridepublic void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {Tuple currentKey = ctx.getCurrentKey();CountWithTimestampNew countWithTimestampNew = state.value();if (countWithTimestampNew == null) {countWithTimestampNew = new CountWithTimestampNew();countWithTimestampNew.setKey(value.f0);}countWithTimestampNew.getAndIncrementCount();//更新这个单词最后一次出现的时间countWithTimestampNew.setLastQuestTimestamp(ctx.timestamp());//单词之间不会互相覆盖吗?推测state对象是跟key绑定,针对每一个不同的key KeyedProcessFunction会创建其对应的state对象state.update(countWithTimestampNew);//给当前单词创建定时器,十秒后触发long timer = countWithTimestampNew.getLastQuestTimestamp()+10000;//尝试注释掉看看是否还会触发onTimer方法ctx.timerService().registerProcessingTimeTimer(timer);//打印所有信息,用于确保数据准确性System.out.println(String.format(" 触发processElement方法,当前的key是 %s, 这个单词累加次数是 %d, 上次请求的时间是:%s, timer的时间是: %s",currentKey.getField(0),countWithTimestampNew.getCount(),time(countWithTimestampNew.getLastQuestTimestamp()),time(timer)));}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {Tuple currentKey = ctx.getCurrentKey();CountWithTimestampNew countWithTimestampNew = state.value();//标记当前元素是否已经连续10s未出现boolean isTimeout = false;if (timestamp >= countWithTimestampNew.getLastQuestTimestamp()+10000 ) {//out.collect(new Tuple2<>(countWithTimestampNew.getKey(), countWithTimestampNew.getCount()));isTimeout = true;}//打印所有信息,用于确保数据准确性System.out.println(String.format(" 触发onTimer方法,当前的key是 %s, 这个单词累加次数是 %d, 上次请求的时间是:%s, timer的时间是: %s, 当前单词是否已超过10秒没有再请求: %s",currentKey.getField(0),countWithTimestampNew.getCount(),time(countWithTimestampNew.getLastQuestTimestamp()),time(timestamp),String.valueOf(isTimeout)));}public static String time(long timeStamp) {return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));}
}

最后是启动类

public class KeyedProcessFunctionDemo2 {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 并行度1env.setParallelism(1);// 处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);// 监听本地9999端口,读取字符串DataStream<String> socketDataStream = env.socketTextStream("localhost", 7777);// 所有输入的单词,如果超过10秒没有再次出现,都可以通过CountWithTimeoutFunction得到DataStream<Tuple2<String, Long>> timeOutWord = socketDataStream// 对收到的字符串用空格做分割,得到多个单词.flatMap(new SplitterFlatMapFunction())// 设置时间戳分配器,用当前时间作为时间戳.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {@Overridepublic long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) {// 使用当前系统时间作为时间戳return System.currentTimeMillis();}@Overridepublic Watermark getCurrentWatermark() {// 本例不需要watermark,返回nullreturn null;}})// 将单词作为key分区.keyBy(0)// 按单词分区后的数据,交给自定义KeyedProcessFunction处理.process(new CountWithTimeoutKeyProcessFunctionNew());// 所有输入的单词,如果超过10秒没有再次出现,就在此打印出来timeOutWord.print();env.execute("ProcessFunction demo : KeyedProcessFunction");}
}

演示

在启动服务前,先通过linux指令监听端口 nc -lk 7777

  1. 启动Flink服务后,往7777端口里面发送数据
    在这里插入图片描述

  2. 通过IDEA的终端可以看到有日志输出,可以看到在发送消息的时候第一条日志立马打印出来并在10秒后输出第二条日志
    在这里插入图片描述

  3. 那么咱们尝试连续发送两条Hello呢,可以看到累加器会持续累加,并且会触发两次onTimer方法,也就是每一条消息都会触发一次。由于连续发送两条,因此可以看得到第三行日志的末尾是false,说明收到第一条后的10秒内又有相同的消息进来。第二条是ture说明在收到第二条消息后的10秒内没有消息进来
    在这里插入图片描述

  4. 再输入点其他的试试
    在这里插入图片描述

  5. 通过输出可以看到这些单词的计数器又从0开始,说明每一个Key都对应一个状态
    在这里插入图片描述

思考题

  1. open方法会在哪里进行调用,KeyedProcessFunction整个类的完整调用逻辑是怎么样的
  2. registerProcessingTimeTimer和registerEventTimeTimer的差异是什么

参考资料

  1. https://blog.csdn.net/boling_cavalry/article/details/106299167
  2. https://blog.csdn.net/lujisen/article/details/105510532
  3. https://blog.csdn.net/qq_31866793/article/details/102831731

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/696739.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

c++实现栈和队列类

c实现栈和队列类 栈(Stack)Stack示意图Stack.cpp 队列(queue)queue 示意图queue.cpp 栈(Stack) Stack示意图 Stack.cpp #pragma once #include "ListStu.cpp"template<typename T> class Stack { public: /* * void push(T& tDate)* 参数一 &#xff1a;…

记录解决uniapp使用uview-plus在vue3+vite+ts项目中打包后样式不能显示问题

一、背景 从 vue2uview1 升级到 vue3vitetsuview-plus ,uview组件样式打包后不显示&#xff0c;升级前uview 组件是可以正常显示&#xff0c;升级后本地运行是可以正常显示&#xff0c;但是打包发布成H5后uview的组件无法正常显示&#xff0c;其他uniapp自己的组件可以正常显示…

Vue 中 onclick和@click区别

文章目录 一、直接上结论二、验证代码&#xff0c;可直接运行三、点击结果 一、直接上结论 onclick 只能触发 js的原生方法&#xff0c;不能触发vue的封装方法click 只能触发vue的封装方法&#xff0c;不能触发js的原生方法 二、验证代码&#xff0c;可直接运行 <!DOCTYP…

Vue3 + Ts (使用lodash)

安装 npm i --save lodash使用 import _ from lodash⚠️报警告&#xff1a;&#xff01;&#xff01;&#xff01; 此时还需要安装ts声明文件库 npm install types/lodash -D安装之后重启Vscode还是会提示上面的警告&#xff0c;此时还需在tsconfig.ts里面配置 {"c…

快速将excel/word表格转换为web页面(html)的方法

前言 在进行开发企业信息化建设的过程&#xff0c;应该有很多这样的场景&#xff0c;就是将现有的电子表格记录的方式转换为在数据系统中进行网页上报。也就是需要根据当前一直使用的表格制作一个上传这个表格信息的网页&#xff0c;如果要减少系统的使用学习成本&#xff0c;…

leetcode:78.子集

1.树形结构&#xff1a;往后依次取该数字往后的数字&#xff08;前面的不要取&#xff0c;否则子集会重复&#xff09;&#xff1b;每一层递归的结果都要放入结果集&#xff0c;而并非只放叶子节点。 代码实现&#xff1a; #达到了叶子节点&#xff08;终止条件&#xff09; …

抖音百科词条创建在哪里?

抖音百科就是头条百科&#xff0c;头条百科是一个在线百科全书平台&#xff0c;用户可以在上面创建、编辑和浏览各种百科词条。头条百科词条可以被抖音抓取到&#xff0c;从而获得更多流量和曝光&#xff0c;所以当你创建一个抖音百科词条的时候&#xff0c;就能更加提高自身的…

人工智能_CPU安装运行ChatGLM大模型_ChatGlm-6B_启动命令行对话_安装API调用接口_005---人工智能工作笔记0100

然后我们再进入 /data/module/ChatGLM-6B-main文件夹 然后我们去启动,命令行工具 python3 cli_demo.py 可以看到也是可以用了. 正常可以用了. 然后主要来看,如何使用api来调用呢,这样才可以,做自己的界面 可以看到就非常简单了只需要: 走到 /data/module/

即时设计和sketch对比

在设计领域&#xff0c;有很多易于使用的设计软件&#xff0c;每个软件都有自己的特点&#xff0c;但在使用中也会有一些限制。例如&#xff0c;传统的Sketch。Sketch是一款古老的UI设计软件。自2010年推出以来&#xff0c;已有10多年的历史&#xff0c;但它始终仅限于MAC。到目…

【JS逆向学习】同花顺(q.10jqka)补环境

逆向目标 目标网址&#xff1a;https://q.10jqka.com.cn/ 目标接口&#xff1a; https://q.10jqka.com.cn/index/index/board/all/field/zdf/order/desc/page/3/ajax/1/ 目标参数&#xff1a;cookie 逆向过程 老规矩&#xff0c;先分析网络请求&#xff0c;发现是 cookie 加…

matlab代码--基于matlabLDPC-和积译码系统

LDPC编码 一个码长为n、信息位个数为k的线性分组码&#xff08;n,k&#xff09;可以由一个生成矩阵 来定义&#xff0c;信息序列 通过G被映射到码字XS.G。线性分组码也可以由一个校验矩阵 来描述。所以码字均满足 。校验矩阵的每一行表示一个校验约束 &#xff0c;其中所有的非…

一文吃透计算机网络面试八股文

面试网站&#xff1a;topjavaer.cn 目录&#xff1a; 网络分层结构三次握手两次握手可以吗&#xff1f;四次挥手第四次挥手为什么要等待2MSL&#xff1f;为什么是四次挥手&#xff1f;TCP有哪些特点&#xff1f;说说TCP报文首部有哪些字段&#xff0c;其作用又分别是什么&…

电阻知识详解

基本介绍 电阻阻碍电流流动&#xff1a;只要有电流流过电阻&#xff0c;就会产生功率损耗 基本单位&#xff1a;欧姆&#xff0c;Ω 换算单位&#xff1a;微欧&#xff08;uΩ&#xff09;、毫欧&#xff08;mΩ&#xff09;、千欧&#xff08;kΩ&#xff09;、兆欧&#x…

字典树相关例题题解

一.P2580 于是他错误的点名开始了 这道题也类似于模版题&#xff0c;只要我们熟悉插入和查找的过程&#xff0c;一样可以解决&#xff0c;这里只要注意一下第一次出现和其它次出现所输出是不一样的&#xff0c;这里我们只要在查找函数中返回不同的值&#xff0c;这样就可以解决…

MySQL优化之SQL优化详解

(/≧▽≦)/~┴┴ 嗨~我叫小奥 ✨✨✨ &#x1f440;&#x1f440;&#x1f440; 个人博客&#xff1a;小奥的博客 &#x1f44d;&#x1f44d;&#x1f44d;&#xff1a;个人CSDN ⭐️⭐️⭐️&#xff1a;传送门 &#x1f379; 本人24应届生一枚&#xff0c;技术和水平有限&am…

Laravel01 课程介绍以及Laravel环境搭建

Laravel01 课程介绍 1. Laravel2. mac开发环境搭建(通过Homebrew)3. 创建一个项目 1. Laravel 公司中面临着PHP项目与Java项目并行&#xff0c;所以需要我写PHP的项目&#xff0c;公司用的框架就是Laravel&#xff0c;所以在B站上找了一门课学习。 Laravel中文文档地址 https…

leetcode hot 100最后一块石头重量Ⅱ

在本题中&#xff0c;我们可以知道&#xff0c;是要求最后石头返还的重量&#xff0c;也就是&#xff0c;将整个数组分割成两个子集&#xff0c;求让两个子集的差值最小。这和上一道分割整数集类似&#xff0c;只是需要我们返回差值。所以我们采用动态规划01背包来做&#xff0…

StarRocks加速查询——低基数全局字典

前言 StarRocks-2.0引入了低基数全局字典&#xff0c;可以通过全局字典将字符串的相关操作转换成整型相关操作&#xff0c;极大提升了查询性能。StarRocks 2.0后的版本默认会开启低基数字典优化。 一、低基数字典 对于利用整型替代字符串进行处理&#xff0c;通常使用字典编码…

穿越Redis单线程迷雾:从面试场景到技术内核的解读

目录 ​编辑 前言 Redis中的多线程 I/O多线程 Redis中的多进程 结论 延伸阅读 前言 很多人都遇到过这么一道面试题&#xff1a;Redis是单线程还是多线程&#xff1f;这个问题既简单又复杂。说他简单是因为大多数人都知道Redis是单线程&#xff0c;说复杂是因为这个答案…

Leetcode - 周赛385

目录 一&#xff0c;3042. 统计前后缀下标对 I 二&#xff0c;3043. 最长公共前缀的长度 三&#xff0c;3044. 出现频率最高的质数 四&#xff0c;3045. 统计前后缀下标对 II 一&#xff0c;3042. 统计前后缀下标对 I 该题数据范围小&#xff0c;可直接暴力求解&#xff0c;…