大数据-玩转数据-Flink 水印

一、Flink 中的水印

在Flink的流式操作中, 会涉及不同的时间概念:

1.1 处理时间

是指的执行操作的各个设备的时间,对于运行在处理时间上的流程序, 所有的基于时间的操作(比如时间窗口)都是使用的设备时钟。比如, 一个长度为1个小时的窗口将会包含设备时钟表示的1个小时内所有的数据。 假设应用程序在 9:15am分启动, 第1个小时窗口将会包含9:15am到10:00am所有的数据,然后下个窗口是10:00am-11:00am, 等等。处理时间是最简单时间语义, 数据流和设备之间不需要做任何的协调。他提供了最好的性能和最低的延迟。 但是, 在分布式和异步的环境下,处理时间没有办法保证确定性,容易受到数据传递速度的影响: 事件的延迟和乱序。在使用窗口的时候, 如果使用处理时间, 就指定时间分配器为处理时间分配器。

1.2 事件时间

是指的这个事件发生的时间。在event进入Flink之前, 通常被嵌入到了event中, 一般作为这个event的时间戳存在。在事件时间体系中, 时间的进度依赖于数据本身,和任何设备的时间无关。事件时间程序必须制定如何产生Event Time Watermarks(水印) 。假设所有数据都已到达,事件时间操作将按预期方式运行,即使在处理无序或迟到的事件或重新处理历史数据时,也会产生正确且一致的结果。例如,每小时事件时间窗口将包含带有事件时间戳的所有记录,这些记录落入该小时。在使用窗口的时候, 如果使用事件时间, 就指定时间分配器为事件时间分配器。从1.12开始, Flink内部已经把默认的语义改成了事件时间。

1.3 Flink中的WaterMark

支持event time的流式处理框架需要一种能够测量event time 进度的方式。 比如, 一个窗口算子创建了一个长度为1小时的窗口,那么这个算子需要知道事件时间已经到达了这个窗口的关闭时间,从而在程序中去关闭这个窗口。事件时间可以不依赖处理时间来表示时间的进度。例如,在程序中, 即使处理时间和事件时间有相同的速度, 事件时间可能会轻微的落后处理时间。另外一方面使用事件时间可以在几秒内处理已经缓存在Kafka中多周的数据,这些数据可以照样被正确处理, 就像实时发生的一样能够进入正确的窗口。这种在Flink中去测量事件时间的进度的机制就是watermark(水印)。

1.4 Flink中如何产生水印

在这里插入图片描述

二、代码集成

package com.lyh.flink08;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import javax.naming.Context;
import java.time.Duration;public class WatorMark_01 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> stream = env.socketTextStream("hadoop100", 9999).map(new MapFunction<String, WaterSensor>() {@Overridepublic WaterSensor map(String value) throws Exception {String[] datas = value.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));}});WatermarkStrategy<WaterSensor> wms = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { // 指定时间戳@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {return element.getTs() * 1000;}});stream.assignTimestampsAndWatermarks(wms) // 指定水印和时间戳.keyBy(WaterSensor::getId).window(TumblingEventTimeWindows.of(Time.seconds(5))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {String msg = "当前key: " + key+ "窗口: [" + ctx.window().getStart() / 1000 + "," + ctx.window().getEnd()/1000 + ") 一共有 "+ elements.spliterator().estimateSize() + "条数据 ";out.collect(msg);}}).print();env.execute();}
}

三、测试结果

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/61046.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

传承精神 缅怀伟人——湖南多链优品科技有限公司赴韶山开展红色主题活动

8月27日上午&#xff0c; 湖南多链优品科技有限公司全体员工怀着崇敬之情&#xff0c;以红色文化为引领&#xff0c;参加了毛泽东同志诞辰130周年的纪念活动。以董事长程小明为核心的公司班子成员以及全国优秀代表近70人一行专赴韶山&#xff0c;缅怀伟人毛泽东同志的丰功伟绩。…

软件测试/测试开发丨Selenium 高级定位 CSS

点此获取更多相关资料 本文为霍格沃兹测试开发学社学员学习笔记分享 原文链接&#xff1a;https://ceshiren.com/t/topic/27022 一、CSS选择器概念 CSS拥有自己的语法规则和表达式 CSS通常分为相对定位和绝对定位 CSS常和XPATH一起用于UI自动化测试 二、CSS相对定位使用场景 支…

Django基础4——模板系统

文章目录 一、基本了解1.1 引用变量1.2 全局变量 二、if判断2.1 语法2.2 案例 三、for循环3.1 语法3.2 案例3.3 forloop变量3.4 容错语句 四、过滤器4.1 内置过滤器4.2 自定义过滤器 五、模板继承六、模板导入七、引用静态文件 一、基本了解 概念&#xff1a; Django模板系统&a…

11. 网络模型保存与读取

11.1 网络模型保存(方式一) import torchvision import torch vgg16 torchvision.models.vgg16(pretrainedFalse) torch.save(vgg16,"./model/vgg16_method1.pth") # 保存方式一&#xff1a;模型结构 模型参数 print(vgg16) 结果&#xff1a; VGG((feature…

Property ‘sqlSessionFactory‘ or ‘sqlSessionTemplate‘ are required问题解决

运行程序的时候出现了如下的报错&#xff1a; 解决方法&#xff1a;去除EnableAutoConfiguration中的(exclude{DataSourceAutoConfiguration.class})

Compose - 交互组合项

按钮 Button OutLinedButton带外边框、TextButton只是文字、IconButton只是图标形状。 Button(onClick { }, //点击回调modifier Modifier,enabled true, //启用或禁用interactionSource MutableInteractionSource(),elevation ButtonDefaults.elevatedButtonElevation( /…

【24考研】:四川大学计算机学院23届874考研考情分析

四川大学计算机学院23届CS考研考情分析 作者&#xff1a;老李 往年都是大佬们做的&#xff0c; 今年正好自己在做公众号这一块&#xff0c; 因此不自量力的承担这个工作&#xff0c;顺便锻炼一点pandas包和plt包的应用能力。 所以形式上我也会仿照一下往年的大佬。 21考情&a…

vue使用qrcodejs2生成二维码

目录 概要 构建展示的vue组件qrcode.vue 组件的使用 概要 项目中用到需要展示二维码的样式&#xff0c;想到了qrcode 例如&#xff1a; 前提&#xff1a;安装包 npm install qrcodejs2 --save 构建展示的vue组件qrcode.vue <template><div style"width: …

如何用Python爬虫持续监控商品价格

目录 持续监控商品价格步骤 1. 选择合适的爬虫库&#xff1a; 2. 选择目标网站&#xff1a; 3. 编写爬虫代码&#xff1a; 4. 设定监控频率&#xff1a; 5. 存储和展示数据&#xff1a; 6. 设置报警机制&#xff1a; 7. 异常处理和稳定性考虑&#xff1a; 可能会遇到的…

2781. 最长合法子字符串的长度

2781. 最长合法子字符串的长度 C代码&#xff1a;滑动窗口、哈希表 typedef struct{char* str;UT_hash_handle hh; } HashTable;HashTable* head;void AddToHash(char* str) {HashTable* out (HashTable*)malloc(sizeof(HashTable));out->str str;HASH_ADD_STR(head, str…

ThinkPHP 文件上传 fileSystem 扩展的使用

ThinkPHP 文件上传 ThinkPHP 文件上传 扩展 filesystem一、安装 FileSystem 扩展二、认识 filesystem 配置文件 config/filesystem.php三、上传验证&#xff08;涉及到验证器的知识点&#xff09;四、文件上传demo ThinkPHP 文件上传 扩展 filesystem ThinkPHP 为我们 提供了 …

【前端】JQ实时显示当前日期、时间、星期

效果图 html <span id"time"></span> JS // 实时显示当前时间 $(document).ready(function () {function showTime() {var today new Date;var y today.getFullYear();var M today.getMonth() 1;var d today.getDate();var w today.getDay();va…

Android 音频框架 基于android 12

文章目录 前言音频服务audioserver音频数据链路hal 提供什么样的作用 前言 Android 的音频是一个相当复杂的部分。从应用到框架、hal、kernel、最后到硬件&#xff0c;每个部分的知识点都相当的多。而android 这部分代码在版本之间改动很大、其中充斥着各种workaround的处理&a…

《论文阅读18》JoKDNet

一、论文 研究领域&#xff1a;用于大尺度室外TLS点云配准的联合关键点检测和特征表达网络论文&#xff1a;JoKDNet: A joint keypoint detection and description network for large-scale outdoor TLS point clouds registration International Journal of Applied Earth Ob…

Delphi 11.3 FMX 多设备平台中使用 TGrid 实现类似 TDBGrid 的效果

Delphi Firemonkey 中 TDBGrid 这个控件已经没有了。如何实现类似这个效果呢。其实可以用TGrid 来实现。以下用 11.3 来讲解。 查询里面用到的 connection 和 query 等控件那些一般的数据库用法&#xff0c;就不做过多描述了。请参考其他资料。 方法一.通过界面配置来实现 在…

Codeforces Round 888 (Div. 3)

Codeforces Round 888 (Div. 3) A. Escalator Conversations 思路&#xff1a;暴力枚举 我们可以发现要让他们能相同高度首先你们之间的差值必须是k的倍数并且这个倍数必须小于m并且不能存在相同高度 #include<bits/stdc.h> using namespace std; #define int long lo…

unity 物体至视图中心以及新对象创建位置

如果游戏对象不在视野中心或在视野之外&#xff0c; 一种方法是双击Hierarchy中的对象名称 另一种是选中后按F 新建物体时对象的位置不是在坐标原点&#xff0c;而是在当前屏幕的中心

将 Llama2 中文模型接入 FastGPT,再将 FastGPT 接入任意 GPT 套壳应用,真刺激!

FastGPT 是一个基于 LLM 大语言模型的知识库问答系统&#xff0c;提供开箱即用的数据处理、模型调用等能力。同时可以通过 Flow 可视化进行工作流编排&#xff0c;从而实现复杂的问答场景&#xff01; Llama2 是Facebook 母公司 Meta 发布的开源可商用大模型&#xff0c;国内的…

深度学习(前馈神经网络)知识点总结

用于个人知识点回顾&#xff0c;非详细教程 1.梯度下降 前向传播 特征输入—>线性函数—>激活函数—>输出 反向传播 根据损失函数反向传播&#xff0c;计算梯度更新参数 2.激活函数(activate function) 什么是激活函数&#xff1f; 在神经网络前向传播中&#x…

3D风速仪 Gill Instruments Limited_R3-50 R3-100 and R3A -100 Manual

R3测量超声波脉冲从上部换能器到相反的下部换能器所花费的时间&#xff0c;并将其与脉冲从下部换能器到上部换能器的时间进行比较。 同样&#xff0c;在其他上下换能器之间比较时间。 如图1所示&#xff0c;每对换能器之间沿轴的空气速度可以从每条轴上的飞行次数计算出来。 …