【核心重点】Flink四大基石

1. Time(时间机制)

时间概念

  • 处理时间:执行具体操作时的机器时间(例如 Java的 System.currentTimeMillis()) )
  • 事件时间:数据本身携带的时间,事件产生时的时间。
  • 摄入时间:数据进入 Flink 的时间;在系统内部,会把它当做事件时间来处理。

事件时间在实际应用中更为广泛,从Flink 1.12版本开始,Flink已经将事件时间作为默认的时间语义

Flink 可以根据不同的时间概念处理数据。

2. Window(窗口计算)

收集窗口时间内的数据,对窗口中收集数据进行聚合运算这就是窗口机制

窗口的生命周期

创建:属于该窗口的第一个元素到达时就会创建该窗口,窗口事先定义好就是固定的,但是窗口创建时间不固定【窗口开始时间以水印所携带的时间戳作为标准】

销毁:窗口结束时间之后,就会销毁当前窗口

Flink窗口分类以及窗口 API

Watermark处理乱序数据

3. State(状态机制)

什么是Flink的状态

状态其实是个变量,这个变量保存了数据流的历史数据, 如果有新的数据流进来,会读取状态变量,将新的数据和历史一起计算。

状态分类

托管状态(Managed State)和原始状态(Raw State)

托管状态就是由 Flink 统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由Flink实现,直接使用API

原始状态则是自定义的,相当于就是开辟了一块内存,需要自己管理,实现状态的序列化和故障恢复。

通常采用 Flink 托管状态来实现需求

算子状态(Operator State)和按键分区状态(Keyed State)

可以将托管状态分为两类:算子状态和按键分区状态。

keyBy 将DataStream转换为KeyedStream,KeyedStream是特殊的DataStream。

KeyedState只能应用于KeyedStream,因此KeyedState的计算只能放在KeyBy之后

基于状态(KeyedState)计算实现词频统计

代码实现

事先定义一个实体类:

public class WordCount {private String word;private Integer count;// setter&getter&toString方法
}  

Flink程序基本流程:

/*** description: 基于状态(KeyedState)计算实现词频统计*/
public class WordCountWithStateful {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> lines =env.socketTextStream("127.0.0.1",8888,"\n");lines.flatMap((String input, Collector<WordCount> output)-> {String[] words = input.split(" ");for(String word:words) {output.collect(new WordCount(word,1));}}).returns(WordCount.class)// keyBy之后,每个key都有对应的状态,同一个key只能操作自己对应的状态.keyBy(WordCount::getWord)// 状态计算.flatMap(new WordCountStateFunc()).print();env.execute();}
}

计算函数:

public class WordCountStateFunc extends RichFlatMapFunction<WordCount, WordCount> {/*** 状态变量*/private ValueState<WordCount> keyedState;/*** description: open方法中状态变量的初始化*/@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<WordCount> valueStateDescriptor =// valueState描述器new ValueStateDescriptor<>(// 描述器的名称"wordcountState",/* * 描述器的数据类型:** Flink有自己的一套数据类型,包含了JAVA和Scala的所有数据类型* 这些数据类型都是TypeInformation对象的子类。* TypeInformation对象统一了所有数据类型的序列化实现*/TypeInformation.of(WordCount.class));keyedState = getRuntimeContext().getState(valueStateDescriptor);}/*** description: keyedState计算逻辑*/@Overridepublic void flatMap(WordCount input, Collector<WordCount> output) throws Exception {// 读取状态WordCount lastKeyedState = keyedState.value();// 更新状态if (lastKeyedState == null) {// 状态还未赋值的情况 更新状态keyedState.update(input);// 返回原数据output.collect(input);} else {// 状态存在旧的状态数据的情况Integer count = lastKeyedState.getCount() + input.getCount();WordCount newWordCount = new WordCount(input.getWord(), count);// 更新状态keyedState.update(newWordCount);// 返回新的数据output.collect(newWordCount);}}}

keyedState状态计算步骤

  1. 继承Rich函数
  2. 重写Open方法,对状态变量进行初始化
  3. 状态计算逻辑

为什么要进行有状态的计算 ?

如果Flink发生了异常退出,checkpoint机制可以读取保存的状态,进行恢复。

广播流、广播状态

有时希望算子并行子任务都保持同一份“全局”状态,用来做统一的配置和规则设定。这时所有分区的所有数据都会访问到同一个状态,状态就像被“广播”到所有分区一样,这种特殊的算子状态,就叫作广播状态(BroadcastState)。【可以动态修改配置】

编码步骤

  1. 构建事件流
  2. 构建广播流
  3. 将事件流和广播流连接
  4. 对连接后的流进行处理

状态后端

Flink中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backend)。状态后端主要负责管理本地状态的存储方式和位置。

  1. Memory State Backend 【java内存HashMap】
  2. FS State Backend 【HDFS】
  3. RocksDB State Backend 【可持久化的key value存储引擎】

选择正确的状态后端

HashMapStateBackend 是内存计算读写速度非常快;但是,状态的大小到集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源。

RocksDB 是硬盘存储,可以根据可用的磁盘空间进行扩展,所以它非常适合于超级海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化,而且可能需要直接从磁盘读取数据,这就会导致性能的降低,平均读写性能要比HashMapStateBackend慢一个数量级

空间和时间的抉择

4. Checkpoint(容错机制)

什么是Checkpoint(检查点)

Checkpoint能生成快照(Snapshot)
若Flink程序崩溃,重新运行程序时可以有选择地从这些快照进行恢复
Checkpoint是Flink可靠性的基石

Checkpoint和State的区别

State指某个算子的数据状态(中间状态),Checkpoint指所有算子的数据状态(全局快照)
State保存在堆内存,Checkpoint持久化保存

Checkpoint分布式快照流程(重点)

水用挡板挡停让水静止,进行快照存储;Checkpoint机制也是如此,Checkpoint Barrier类似挡板

步骤一

Source子任务收到了Checkpoint请求,该算子会对自己的数据状态保存快照向自己的下一个算子发送Checkpoint Barrier
下一个算子只有收到上一个算子广播过来的Checkpoint Barrier,才进行快照保存

步骤二

Sink算子已经收到了所有上游的Checkpoint Barrier时,进行以下2步操作:
1.保存自己的数据状态,2.并直接通知检查点协调器
检查点协调器在收集所有的task通知后,就认为这次的Checkpoint全局完成了,从而进行持久化操作

Checkpoint如何保证数据的一致性(重点)

至少一次(at-least-once)

发生故障,可能会有重复数据

精确一次(exactly-once)

发生故障,能保证不丢失数据,也没有重复数据

读取最近一次存放的快照,数据重放重新计算,Checkpoint机制保证exactly-once

Checkpoint Barrier对齐机制

Barrie对齐机制保证了Checkpoint数据状态的精确一致

下游算子上面对应多个上游算子,下游算子必须要等到上游算子所有的Checkpoint Barrier到齐之后,下游算子才会进行快照的输入。(会把先到的Checkpoint Barrier数据先缓存起来,直到所有的Checkpoint Barrier全部到达,该算子才会进行快照操作

什么是savepoint(保存点)

基于checkpoint机制的快照

Checkpoint和Savepoint区别
Checkpoint是自动容错恢复机制,Savepoint某个时间点的全局状态镜像
Checkpoint是Flink系统行为,Savepoint是用户触发
Checkpoint默认程序删除,Savepoint会一直保存

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/210149.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux vim 基础设置-自动填充文件头

前言 当前为vimrc脚本设置&#xff0c;脚本位置在 ~/.vimrc or /etc/vimrc 当前为首次打开 C C Shell 文件&#xff0c;自动填充对应文件头信息&#xff0c;再次打开时会修改对应时间信息 :set nu "显示行号 :set hlsearch "搜索时 高亮"新建 .c .cpp .sh文件&a…

理解Go语言中的defer

引言 Go有许多在其他编程语言中可以找到的常见控制流关键字,例如if、switch、for等。defer是其他大多数编程语言中没有的关键字,尽管它不太常见,但你很快就会看到它在你的程序中有多么有用。 defer语句的主要用途之一是清理资源,例如打开的文件,网络连接和数据库句柄。在…

在AWS Lambda上部署EC2编译的FFmpeg工具——自定义层的方案

大纲 1 确定Lambda运行时环境1.1 Lambda系统、镜像、内核版本1.2 运行时1.2.1 Python1.2.2 Java 2 环境准备2.1 创建EC2实例 3 编译FFmpeg3.1 连接EC2 4 编译5 上传S3存储桶5.1 创建S3桶5.2 创建IAM策略5.3 创建IAM角色5.4 EC2关联角色5.5 修改桶策略5.6 打包并上传 6 创建Lamb…

智能优化算法应用:基于海鸥算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于海鸥算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于海鸥算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.海鸥算法4.实验参数设定5.算法结果6.参考文献7.MA…

【nuxt3】cannot read preperties of null (reading ‘$nuxt‘)

问题描述 vue3 中&#xff0c;通过 createVNode 创建子组件实例时&#xff0c;发现子组件无法获取到父组件中的 router、store 信息&#xff0c;一旦子组件使用就会报错。 问题原因 通过控制台断点调试&#xff0c;发现时 appContext 值为空导致的。怀疑是创建子组件的时候&a…

海外地区开启IPV6无法访问服务器问题

前言 最近有海外地区的用户反馈无法访问公司的网络&#xff0c;无法下载应用和系统进行升级。了解到浏览器可以正常访问公司域名&#xff0c;谷歌&#xff0c;油管等都能正常使用。日志分析GET请求服务器数据时没有得到应答&#xff0c;最终查询网络相关修改确认与网络IPV6有关…

掌握游戏开发的全方位知识:这些内容你一定要知道

游戏开发是一项涉及多学科的综合性工作&#xff0c;从游戏设计到编程、美术、音频、测试等多个方面都需要开发者具备广泛的知识。以下是进行游戏开发时需要掌握的主要知识领域。 首先&#xff0c;游戏设计是整个过程的基石。这包括游戏机制和玩法设计、关卡设计、用户界面&…

表示你的shell未被正确配置以使用conda activate--换成清华源anaconda

1 CommandNotFoundError: Your shell has not been properly configured to use conda activate. If using conda activate from a batch script, change your invocation to CALL conda.bat activate.To initialize your shell, run$ conda init <SHELL_NAME>这个错误提…

uniapp-获取手机型号

要获取当前设备的手机型号&#xff0c;您可以使用uni-app提供的uni.getSystemInfo() API来实现此目的。 代码示例&#xff1a; uni.getSystemInfo({success: function(res) {console.log("手机型号&#xff1a;" res.platform)} })该方法会返回一个包含设备信息的…

JFrog推出面向Hugging Face的原生集成,为 ML 模型提供强大支持,实现DevOps、安全和AI的协调统一

2023年12月5日 —— 流式软件公司、企业软件供应链平台提供商JFrog推出ML模型管理功能&#xff0c;这是业界首套旨在简化机器学习&#xff08;ML&#xff09;模型管理和安全性的功能。JFrog 平台中的全新ML模型管理功能使AI交付与企业现有的 DevOps 和 DevSecOps 实践保持一致&…

计算机评价的主要性能指标

对计算机评价的主要性能指标如下&#xff1a; 1&#xff0e;时钟频率&#xff08;主频&#xff09; 主频是计算机的主要性能指标之一&#xff0c;在很大程度上决定了计算机的运算速度。CPU 的工作节拍是由主时钟来控制的&#xff0c;主时钟不断产生固定频率的时钟脉冲&#xff…

一个简单的可视化的A星自动寻路

一个简单的应用场景&#xff0c;流程图连线 源码&#xff1a; addExample("A星路径查找", function () {return {template: <div><div ref"main"></div></div>,data() { return {}; },computed: {},methods: {},mounted() {var c…

Python中的比较两个字符串

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com 在Python编程中&#xff0c;字符串比较是一项常见且关键的操作&#xff0c;涵盖了诸多方法和技巧。比较两个字符串是否相等、大小写是否一致&#xff0c;或者在一个字符串中寻找特定的子字符串&#xff0c;都是日…

征途漫漫:汽车MCU的国产替代往事

01.西雁东飞&#xff0c;南下创业 1985年&#xff0c;山东大学物理系毕业的周生明加入878厂&#xff08;“北霸天”&#xff09;参与MOS电路研发&#xff0c;随后几年&#xff0c;大洋彼岸的英特尔相继推出CPU 386\486、奔腾系列等产品。在摩尔定律的凸显、进口和走私的剧烈冲…

基于Java房屋租赁管理系统

基于Java房屋租赁管理系统 功能需求 1、房源信息管理&#xff1a;系统需要能够记录和管理所有房源的详细信息&#xff0c;包括房屋地址、房屋面积、租金、付款方式、房屋类型等。管理员应该可以添加、编辑和删除房源信息。 2、租户信息管理&#xff1a;系统需要能够记录和管…

class067 二维动态规划【算法】

class067 二维动态规划 code1 64. 最小路径和 // 最小路径和 // 给定一个包含非负整数的 m x n 网格 grid // 请找出一条从左上角到右下角的路径&#xff0c;使得路径上的数字总和为最小。 // 说明&#xff1a;每次只能向下或者向右移动一步。 // 测试链接 : https://leetcode…

<JavaEE> 经典设计模式之 -- 线程池

目录 一、线程池的概念 二、Java 标准库中的线程池类 2.1 ThreadPoolExecutor 类 2.1.1 corePoolSize 和 maximumPoolSize 2.1.2 keepAliveTime 和 unit 2.1.3 workQueue 2.1.4 threadFactory 2.1.5 handler 2.1.6 创建一个参数自定义的线程池 2.2 Executors 类 2.3…

go学习笔记(17)Blob and ArrayBuffer

最近在学习go websocket的时候&#xff0c;在学习实验过程遇到一个比较奇怪问题。为什么我的数据返回是blob&#xff0c;而不是arrayBuffer&#xff1f;百思不得其解。 直到同事打包的时候微信小游戏遇到了一个报错。FileReader不支持。 经过在社区查询&#xff0c;官方答复是…

Qt之QCache和QContiguousCache

一.QCache QCache在构造的时候指定了缓存中允许的最大成本,也就是如下构造函数中的参数maxCost。默认情况下,QCaches maxCost() 是100。 QCache(int maxCost = 100) ~QCache() void clear() bool contains(const Key &key) const int count() const bool insert(const …

[原创] 电源芯片输出端的纹波测试

网上有很多文章讲解&#xff0c;电源芯片的纹波测试&#xff0c;原理图各种讲解&#xff0c;理论有余&#xff0c;实质性测试细节不够细致&#xff0c;想写一些测试步骤&#xff0c;作为分享和记录。 1、设置示波器参数 1.1 校准示波器 1.2 探头按钮推到X1&#xff08;代表波…