208.Flink(三):窗口的使用,处理函数的使用

目录

一、窗口

1.窗口的概念

2.窗口的分类

(1)按照驱动类型分

(2)按照窗口分配数据的规则分类

3.窗口api概览

(1)按键分区(Keyed)和非按键分区(Non-Keyed)

*1)按键分区窗口(Keyed Windows)

*2)非按键分区(Non-Keyed Windows)

(2)代码中窗口API的调用

(3)窗口分配器

(4)窗口函数

*1)增量聚合函数

^1)归约函数(ReduceFunction)

^2)聚合函数(AggregateFunction)

*2)全窗口函数(full window functions)

*3)增量聚合和全窗口函数的结合使用

(5)触发器(Trigger)

(6)移除器(Evictor)

(7)窗口的简单原理

*1)一个数据来了,怎么认为他是哪个窗口内的数据?

*2)窗口特性

*3)窗口的生命周期

4.时间语义

(1)Flink中的时间语义

(2)Flink以事件时间为默认时间语义

5.水位线(Watermark)

(1)水位线的概念

*1)有序流中的水位线

*2)乱序流中的水位线

(2)水位线和窗口的工作原理

(3) 生成水位线

*1)总体原则

*2)有序流中内置水位线设置

*3)乱序流中内置水位线设置

*4)自定义水位线生成器(周期式、断点式)

*5)在数据源中发送水位线

(6)迟到数据的处理

*1)设置乱序容忍度

*2)设置窗口延迟关闭

*3)侧输出流

(7)基于时间的合流——双流联结(Join)

*1)窗口联结(Window Join)

*2)间隔联结(Interval Join)

二、处理函数

1.基本处理函数(ProcessFunction)

(1)处理函数的功能和使用

(2)ProcessFunction解析

(3)处理函数的分类

2.按键分区处理函数(KeyedProcessFunction)

(1)定时器(Timer)和定时服务(TimerService)

(2)KeyedProcessFunction注意点及实现

3.应用案例:Top N

(1)方法一:ProcessAllWindowFunction

(2)方法二:

4.侧输出流


一、窗口

在批处理统计中,我们可以等待一批数据都到齐后,统一处理。但是在实时处理统计中,我们是来一条就得处理一条,那么我们怎么统计最近一段时间内的数据呢?引入“窗口”。

1.窗口的概念

Flink是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。

Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。

到达窗口结束时间时,窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开。

2.窗口的分类

(1)按照驱动类型分

*1)时间窗口

一定时间作为一个窗口

*2)计数窗口

达到多少数量作为一个窗口

(2)按照窗口分配数据的规则分类

*1)滚动窗口

以一个固定时间为窗口,第一个窗口结束的时间就是下一个窗口开始的时间。

*2)滑动窗口

窗口大小 + 步长。

如果步长 = 窗口大小,其实就是滚动窗口的情况。

步长 > 窗口大小,会有数据被漏掉。

步长 < 窗口大小,窗口会有重叠

*3)会话窗口

基于会话对数据分组

*4)全局窗口

全局有效,没有结束时间

3.窗口api概览

(1)按键分区(Keyed)和非按键分区(Non-Keyed)

定义窗口前,需要确认数据流是基于keyBy还是没有keyBy的。

*1)按键分区窗口(Keyed Windows)

经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。

stream.keyBy(...).window(...)

*2)非按键分区(Non-Keyed Windows)

窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。

对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll本身就是一个非并行的操作。

stream.windowAll(...)

(2)代码中窗口API的调用

窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。

stream.keyBy(<key selector>).window(<window assigner>).aggregate(<window function>)

.window()方法需要传入一个窗口分配器,它指明了窗口的类型。

.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。

(3)窗口分配器

窗口分配器指定窗口的类型。窗口分配器最通用的定义方式,就是调用.window()方法。

(4)窗口函数

窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数全窗口函数

package com.atguigu.window;import com.atguigu.bean.WaterSensor;
import com.atguigu.functions.WaterSensorMapFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;/*** TODO** @author cjp* @version 1.0*/
public class WindowApiDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// TODO 1. 指定 窗口分配器: 指定 用 哪一种窗口 ---  时间 or 计数? 滚动、滑动、会话?// 1.1 没有keyby的窗口: 窗口内的 所有数据 进入同一个 子任务,并行度只能为1
//        sensorDS.windowAll()// 1.2 有keyby的窗口: 每个key上都定义了一组窗口,各自独立地进行统计计算// 基于时间的
//        sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10))) // 滚动窗口,窗口长度10s
//        sensorKS.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2))) // 滑动窗口,窗口长度10s,滑动步长2s
//        sensorKS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) // 会话窗口,超时间隔5s
//        sensorKS.window(GlobalWindows.create())  // 全局窗口,计数窗口的底层就是用的这个,需要自定义的时候才会用// 基于计数的
//        sensorKS.countWindow(5)  // 滚动窗口,窗口长度=5个元素
//        sensorKS.countWindow(5,2) // 滑动窗口,窗口长度=5个元素,滑动步长=2个元素// TODO 2. 指定 窗口函数 : 窗口内数据的 计算逻辑WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 增量聚合: 来一条数据,计算一条数据,窗口触发的时候输出计算结果
//        sensorWS
//                .reduce()
//        .aggregate(, )// 全窗口函数:数据来了不计算,存起来,窗口触发的时候,计算并输出结果
//        sensorWS.process()env.execute();}
}

*1)增量聚合函数
^1)归约函数(ReduceFunction)
package com.atguigu.window;import com.atguigu.bean.WaterSensor;
import com.atguigu.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;/*** TODO** @author cjp* @version 1.0*/
public class WindowReduceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/89898.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

泽众APM性能监控软件

泽众Application Performance Management&#xff08;简称APM&#xff09;是一款专业的性能监控工具&#xff0c;可以对全链路如Web服务器、应用服务器、数据库服务器等进行实时监控&#xff0c;并以图表化的形式直观地呈现监控数据&#xff0c;为系统性能优化和定位问题提供准…

gcc编译webrtc x64

gcc使用Ubuntu系统已经有的gcc version 7.5.0 (Ubuntu 7.5.0-3ubuntu1~18.04) 1、下载离线版webrtc&#xff08;也可以翻墙下载webrtc&#xff09; 百度云链接: 链接: https://pan.baidu.com/s/1oHVz9bxXlW3Q6uO996c5XA 提取码: ojbs 2、下载gn https://github.com/timnieder…

【面试题精讲】Java超过long类型的数据如何表示

有的时候博客内容会有变动&#xff0c;首发博客是最新的&#xff0c;其他博客地址可能会未同步,认准https://blog.zysicyj.top 首发博客地址[1] 面试题手册[2] 系列文章地址[3] 在 Java 中&#xff0c;如果需要表示超过 long 类型范围的数据&#xff0c;可以使用 BigInteger 类…

Caddy Web服务器深度解析与对比:Caddy vs. Nginx vs. Apache

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

基于SpringBoot的大学生就业招聘系统的设计与实现

目录 前言 一、技术栈 二、系统功能介绍 求职信息管理 首页 招聘信息管理 岗位申请管理 岗位分类 企业管理 三、核心代码 1、登录模块 2、文件上传模块 3、代码封装 前言 随着信息互联网信息的飞速发展&#xff0c;大学生就业成为一个难题&#xff0c;好多公司都舍不…

最新AI写作系统ChatGPT源码/支持GPT4.0+GPT联网提问/支持ai绘画Midjourney+Prompt应用+MJ以图生图+思维导图生成

一、智能创作系统 SparkAi创作系统是基于国外很火的ChatGPT进行开发的Ai智能问答系统。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如何搭建部署AI创作ChatGPT&#xff1f;小编这里写一个详细图文教程吧&…

【IPC 通信】信号处理接口 Signal API(6)

收发信号思想是 Linux 程序设计特性之一&#xff0c;一个信号可以认为是一种软中断&#xff0c;通过用来向进程通知异步事件。 本文讲述的 信号处理内容源自 Linux man。本文主要对各 API 进行详细介绍&#xff0c;从而更好的理解信号编程。 kill(2) 遵循 POSIX.1 - 2008 1.库 …

git权限不够:Ask a project Owner or Maintainer to create a default branch

新仓库还未创建任何分支时&#xff0c;Developer角色时首次提交代码&#xff0c;抛如下异常 remote: GitLab: remote: A default branch (e.g. master) does not yet exist for galaxy/apache-jspf-project remote: Ask a project Owner or Maintainer to cre…

leetcode做题笔记154. 寻找旋转排序数组中的最小值 II

已知一个长度为 n 的数组&#xff0c;预先按照升序排列&#xff0c;经由 1 到 n 次 旋转 后&#xff0c;得到输入数组。例如&#xff0c;原数组 nums [0,1,4,4,5,6,7] 在变化后可能得到&#xff1a; 若旋转 4 次&#xff0c;则可以得到 [4,5,6,7,0,1,4]若旋转 7 次&#xff0…

使用光纤激光切割机等激光切割设备时的一些小诀窍

光纤激光切割机极大地提高了钣金加工行业切割效果和生产效率。然而在我们对客户的回访调查中&#xff0c;发现客户普遍存在着对光纤激光切割机设备的保养维护意识不足的问题&#xff0c;这严重影响了设备的正常使用和使用寿命。 虽然激光切割机有日常的保养&#xff0c;但是也需…

jvm垃圾收集算法

简介 由于《分代收集理论》和不同垃圾收集算法&#xff0c;Java堆应该被划分为不同区域&#xff0c;一般至少会把Java堆划分为新生代&#xff08;Young Generation&#xff09;和老年代&#xff08;Old Generation&#xff09;两个区域。 垃圾收集器可以只回收其中某一个或者…

力扣每日一题(+日常水几道题)

每日一题1333. 餐厅过滤器 - 力扣&#xff08;LeetCode&#xff09; 简单的按规则排序,去除几个不满足的条件然后排序返回即可 #include<algorithm> class Solution { public:vector<int> filterRestaurants(vector<vector<int>>& restaurants, …

侯捷 C++ STL标准库和泛型编程 —— 1 STL概述 + 2 OOPvsGP

现在开始更新侯捷的STL的部分了&#xff01;&#xff01;&#xff01; 完整版本会在全部更新完之后就整合发出 或者也可以直接去我的个人网站上查看 关于STL这部分&#xff0c;原课程将其分为了四部分&#xff0c;我做笔记时&#xff0c;会将其整合&#xff0c;使其更具有整体性…

OpenGLES:绘制一个颜色渐变的圆

一.概述 今天使用OpenGLES实现一个圆心是玫红色&#xff0c;向圆周渐变成蓝色的圆。 本篇博文的内容也是后续绘制3D图形的基础。 实现过程中&#xff0c;需要重点关注的点是&#xff1a;如何使用数学公式求得图形的顶点&#xff0c;以及加载颜色值。 废话不多说&#xff0c…

【ROS 2】-2 话题通信

所有内容请看&#xff1a; 博客学习目录_Howe_xixi的博客-CSDN博客https://blog.csdn.net/weixin_44362628/article/details/126020573?spm1001.2014.3001.5502飞书原文链接&#xff1a; Docs

LeetCode算法题---第3天

注:大佬解答来自LeetCode官方题解 121.买卖股票的最佳时期 1.题目 2.个人解答 function maxProfit(prices) {//更新最低价格和最大利润let minPrice prices[0];let maxProfit 0;for (let i 1; i < prices.length; i) {// 如果当前价格比最低价格还低&#xff0c;更新最…

情满中秋᛫欢度国庆 | 联诚发与你共度佳节!

转眼九月份又走到尽头 国庆和中秋正好撞了个满怀 随风飘扬的国旗与满街飘香的月饼 国泰民安与阖家团圆 这是大家与小家最美好的祈愿 当中秋遇上国庆&#xff0c;当团圆遇上国诞 双节来临之际 为庆祝传统佳节与祖国生日 也为感谢联诚发每位员工的辛勤付出 9月28日下午 …

如何礼貌委婉地拒绝上级领导的加班要求?

案例&#xff1a;领导发消息问我今天晚上能否加班完成一项工作&#xff0c;但我已经和一个重要的朋友约好了今晚一起吃饭&#xff0c;我该如何礼貌委婉地拒绝上级领导的加班要求&#xff0c;并且不让上级领导对我产生不好的印象呢? 回复&#xff1a;当面临类似情况时&#xf…

从零开始之了解电机及其控制(11)实现空间矢量调制

广泛地说&#xff0c;空间矢量调制只是将电压矢量以及磁场矢量在空间中调制到任意角度&#xff0c;通常同时最大限度地利用整个电压范围。 其他空间矢量调制模式确实存在&#xff0c;并且根据您最关心的内容&#xff0c;它们可能值得研究。 如何实际执行这种所谓的交替反向序列…

通过http发送post请求的三种Content-Type分析

通过okhttp向服务端发起post网络请求&#xff0c;可以通过Content-Type设置发送请求数据的格式。 常用到的三种&#xff1a; 1&#xff09;application/x-www-form-urlencoded; charsetutf-8 2&#xff09;application/json; charsetutf-8 3&#xff09;multipart/form-dat…