Flink之迟到的数据

迟到数据的处理

  1. 推迟水位线推进: WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
  2. 设置窗口延迟关闭:.allowedLateness(Time.seconds(3))
  3. 使用侧流接收迟到的数据: .sideOutputLateData(lateData)
public class Flink12_LateDataCorrect {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> ds = env.socketTextStream("hadoop102", 8888).map(line -> {String[] fields = line.split(",");return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim()));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2)) // 水位线延迟2秒.withTimestampAssigner((event, ts) -> event.getTs()));ds.print("input");OutputTag<WordCountWithTs> lateOutputTag = new OutputTag<>("late", Types.POJO(WordCountWithTs.class));//new OutputTag<WordCount>("late"){}SingleOutputStreamOperator<UrlViewCount> urlViewCountDs = ds.map(event -> new WordCountWithTs(event.getUrl(), 1 , event.getTs())).keyBy(WordCountWithTs::getWord).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(5))  // 窗口延迟5秒关闭.sideOutputLateData(lateOutputTag) // 捕获到侧输出流.aggregate(new AggregateFunction<WordCountWithTs, UrlViewCount, UrlViewCount>() {@Overridepublic UrlViewCount createAccumulator() {return new UrlViewCount();}@Overridepublic UrlViewCount add(WordCountWithTs value, UrlViewCount accumulator) {accumulator.setCount((accumulator.getCount() == null ? 0L : accumulator.getCount()) + value.getCount());return accumulator;}@Overridepublic UrlViewCount getResult(UrlViewCount accumulator) {return accumulator;}@Overridepublic UrlViewCount merge(UrlViewCount a, UrlViewCount b) {return null;}},new ProcessWindowFunction<UrlViewCount, UrlViewCount, String, TimeWindow>() {@Overridepublic void process(String key, ProcessWindowFunction<UrlViewCount, UrlViewCount, String, TimeWindow>.Context context, Iterable<UrlViewCount> elements, Collector<UrlViewCount> out) throws Exception {UrlViewCount urlViewCount = elements.iterator().next();//补充urlurlViewCount.setUrl(key);//补充窗口信息urlViewCount.setWindowStart(context.window().getStart());urlViewCount.setWindowEnd(context.window().getEnd());// 写出out.collect(urlViewCount);}});urlViewCountDs.print("window") ;//TODO 将窗口的计算结果写出到Mysql的表中, 有则更新,无则插入/*窗口触发计算输出的结果,该部分数据写出到mysql表中执行插入操作,后续迟到的数据,如果窗口进行了延迟, 窗口还能正常对数据进行计算, 该部分数据写出到mysql执行更新操作。建表语句:CREATE TABLE `url_view_count` (`url` VARCHAR(100) NOT NULL  ,`cnt` BIGINT NOT NULL,`window_start` BIGINT NOT NULL,`window_end` BIGINT NOT NULL,PRIMARY KEY (url, window_start, window_end )  -- 联合主键) ENGINE=INNODB DEFAULT CHARSET=utf8*/SinkFunction<UrlViewCount> jdbcSink = JdbcSink.<UrlViewCount>sink("replace into url_view_count(url, cnt ,window_start ,window_end) value (?,?,?,?)",new JdbcStatementBuilder<UrlViewCount>() {@Overridepublic void accept(PreparedStatement preparedStatement, UrlViewCount urlViewCount) throws SQLException {preparedStatement.setString(1, urlViewCount.getUrl());preparedStatement.setLong(2, urlViewCount.getCount());preparedStatement.setLong(3, urlViewCount.getWindowStart());preparedStatement.setLong(4, urlViewCount.getWindowEnd());}},JdbcExecutionOptions.builder().withBatchSize(2).withMaxRetries(3).withBatchIntervalMs(1000L).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName("com.mysql.cj.jdbc.Driver").withUrl("jdbc:mysql://hadoop102:3306/test").withUsername("root").withPassword("000000").build());urlViewCountDs.addSink(jdbcSink) ;//捕获侧输出流SideOutputDataStream<WordCountWithTs> lateData = urlViewCountDs.getSideOutput(lateOutputTag);lateData.print("late");//TODO 将侧输出流中的数据,写出到mysql中的表中,需要对mysql中已经存在的数据进行修正//转换结构  WordCountWithTs => UrlViewCount//调用flink计算窗口的方式, 基于当前数据的时间计算对应的窗口SingleOutputStreamOperator<UrlViewCount> mapDs = lateData.map(wordCountWithTs -> {Long windowStart = TimeWindow.getWindowStartWithOffset(wordCountWithTs.getTs()/*数据时间*/, 0L/*偏移*/, 10000L/*窗口大小*/);Long windowEnd = windowStart + 10000L;return new UrlViewCount(wordCountWithTs.getWord(), 1L, windowStart, windowEnd);});// 写出到mysql中SinkFunction<UrlViewCount> lateJdbcSink = JdbcSink.<UrlViewCount>sink("insert into url_view_count (url ,cnt , window_start ,window_end) values(?,?,?,?) on duplicate key update cnt = VALUES(cnt) + cnt  ",new JdbcStatementBuilder<UrlViewCount>() {@Overridepublic void accept(PreparedStatement preparedStatement, UrlViewCount urlViewCount) throws SQLException {preparedStatement.setString(1, urlViewCount.getUrl());preparedStatement.setLong(2, urlViewCount.getCount());preparedStatement.setLong(3, urlViewCount.getWindowStart());preparedStatement.setLong(4, urlViewCount.getWindowEnd());}},JdbcExecutionOptions.builder().withBatchSize(2).withMaxRetries(3).withBatchIntervalMs(1000L).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName("com.mysql.cj.jdbc.Driver").withUrl("jdbc:mysql://hadoop102:3306/test").withUsername("root").withPassword("000000").build());mapDs.addSink(lateJdbcSink) ;try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

withIdleness关键字

解决某条流长时间没有数据,不能推进水位线,导致下游窗口的窗口无法正常计算。

public class Flink12_WithIdleness {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);SingleOutputStreamOperator<Event> ds1 = env.socketTextStream("hadoop102", 8888).map(line -> {String[] words = line.split(" ");return new Event(words[0].trim(), words[1].trim(), Long.valueOf(words[2].trim()));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((event, ts) -> event.getTs())//如果超过10秒钟不发送数据,就不等待该数据源的水位线.withIdleness(Duration.ofSeconds(10)));ds1.print("input1");SingleOutputStreamOperator<Event> ds2 = env.socketTextStream("hadoop102", 9999).map(line -> {String[] words = line.split(" ");return new Event(words[0].trim(), words[1].trim(), Long.valueOf(words[2].trim()));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((event, ts) -> event.getTs())//如果超过10秒钟不发送数据,就不等待该数据源的水位线
//                                .withIdleness(Duration.ofSeconds(10)));ds2.print("input2");ds1.union(ds2).map(event->new WordCount(event.getUrl(),1)).keyBy(WordCount::getWord).window(TumblingEventTimeWindows.of(Time.seconds(10))).sum("count").print("window");try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

基于时间的合流

窗口联结Window Join

WindowJoin: 在同一个窗口内的相同key的数据才能join成功。

orderDs.join( detailDs ).where( OrderEvent::getOrderId )  // 第一条流用于join的key.equalTo( OrderDetailEvent::getOrderId) // 第二条流用于join的key.window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new JoinFunction<OrderEvent, OrderDetailEvent, String>() {@Overridepublic String join(OrderEvent first, OrderDetailEvent second) throws Exception {// 处理join成功的数据return  first + " -- " + second ;}}).print("windowJoin");

时间联结intervalJoin

在这里插入图片描述

IntervalJoin : 以一条流中数据的时间为基准, 设定上界和下界, 形成一个时间范围, 另外一条流中相同key的数据如果能落到对应的时间范围内, 即可join成功。

核心代码:

 orderDs.keyBy(OrderEvent::getOrderId).intervalJoin(detailDs.keyBy( OrderDetailEvent::getOrderId)).between(Time.seconds(-2) , Time.seconds(2))//.upperBoundExclusive()  排除上边界值//.lowerBoundExclusive()  排除下边界值.process(new ProcessJoinFunction<OrderEvent, OrderDetailEvent, String>() {@Overridepublic void processElement(OrderEvent left, OrderDetailEvent right, ProcessJoinFunction<OrderEvent, OrderDetailEvent, String>.Context ctx, Collector<String> out) throws Exception {//处理join成功的数据out.collect( left + " -- " + right );}}).print("IntervalJoin");

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/214202.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

力扣编程题算法初阶之双指针算法+代码分析

目录 第一题&#xff1a;复写零 第二题&#xff1a;快乐数&#xff1a; 第三题&#xff1a;盛水最多的容器 第四题&#xff1a;有效三角形的个数 第一题&#xff1a;复写零 力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 思路&#xff1a; 上期…

【SpringBoot教程】SpringBoot 统一异常处理(附核心工具类-ErrorInfoBuilder)

作者简介&#xff1a;大家好&#xff0c;我是撸代码的羊驼&#xff0c;前阿里巴巴架构师&#xff0c;现某互联网公司CTO 联系v&#xff1a;sulny_ann&#xff08;17362204968&#xff09;&#xff0c;加我进群&#xff0c;大家一起学习&#xff0c;一起进步&#xff0c;一起对抗…

曲线分板机主轴有何特点?如何选择合适的曲线分板机主轴?

在现代工业领域&#xff0c;分板机主轴作为重要的机械部件&#xff0c;其性能和质量对于生产效率和产品质量具有至关重要的影响。而在这其中&#xff0c;曲线分板机主轴则因为其独特的优势而被广泛应用于PCB电路板的切割和分板。面对市场上众多的曲线分板机主轴品牌&#xff0c…

前端知识(十三)——JavaScript监听按键,禁止F12,禁止右键,禁止保存网页【Ctrl+s】等操作

禁止右键 document.oncontextmenu new Function("event.returnValuefalse;") //禁用右键禁止按键 // 监听按键 document.onkeydown function () {// f12if (window.event && window.event.keyCode 123) {alert("F12被禁用");event.keyCode 0…

软件测试之缺陷管理

一、软件缺陷的基本概念 1、软件缺陷的基本概念主要分为&#xff1a;缺陷、故障、失效这三种。 &#xff08;1&#xff09;缺陷&#xff08;defect&#xff09;&#xff1a;存在于软件之中的偏差&#xff0c;可被激活&#xff0c;以静态的形式存在于软件内部&#xff0c;相当…

【隐马尔可夫模型】隐马尔可夫模型的观测序列概率计算算法及例题详解

【隐马尔可夫模型】用前向算法计算观测序列概率P&#xff08;O&#xff5c;λ&#xff09;​​​​​​​ 【隐马尔可夫模型】用后向算法计算观测序列概率P&#xff08;O&#xff5c;λ&#xff09; 隐马尔可夫模型是关于时序的概率模型&#xff0c;描述由一个隐藏的马尔可夫链…

线性规划-单纯形法推导

这里写目录标题 线性规划例子啤酒厂问题图解法 单纯形法数学推导将问题标准化并转为矩阵形式开始推导 实例图解法单纯形法 线性规划例子 啤酒厂问题 每日销售上限&#xff1a;100箱啤酒营业时间&#xff1a;14小时生产1箱生啤需1小时生产1箱黑啤需2小时生啤售价&#xff1a;2…

从零开发短视频电商 AWS OpenSearch Service开发环境申请以及Java客户端介绍

文章目录 创建域1.创建域2.输入配置部署选项数据节点网络精细访问控制访问策略 获取域端点数据如何插入到OpenSearch ServiceJava连接OpenSearch Servicespring-data-opensearchelasticsearch-rest-high-level-clientopensearch-rest-clientopensearch-java 因为是开发测试使用…

[Linux] nginx的location和rewrite

一、Nginx常用的正则表达式 符号作用^匹配输入字符串的起始位置$ 匹配输入字符串的结束位置*匹配前面的字符零次或多次。如“ol*”能匹配“o”及“ol”、“oll” 匹配前面的字符一次或多次。如“ol”能匹配“ol”及“oll”、“olll”&#xff0c;但不能匹配“o”?匹配前面的字…

图像清晰度 和像素、分辨率、镜头的关系

关于图像清晰度的几个知识点分享。 知识点 清晰度 清晰度指影像上各细部影纹及其边界的清晰程度。清晰度&#xff0c;一般是从录像机角度出发&#xff0c;通过看重放图像的清晰程度来比较图像质量&#xff0c;所以常用清晰度一词。 而摄像机一般使用分解力一词来衡量它“分解被…

linux通过命令切换用户

在Linux中&#xff0c;你可以使用su&#xff08;substitute user或switch user&#xff09;命令来切换用户。这个命令允许你临时或永久地以另一个用户的身份运行命令。以下是基本的用法&#xff1a; 基本切换到另一个用户&#xff08;需要密码&#xff09;&#xff1a;su [用户…

数据科学实践:探索数据驱动的决策

写在前面 你是否曾经困扰于如何从海量的数据中提取有价值的信息?你是否想过如何利用数据来指导你的决策,让你的决策更加科学和精确?如果你有这样的困扰和疑问,那么你来对了地方。这篇文章将引导你走进数据科学的世界,探索数据驱动的决策。 1.数据科学的基本原则 在我们…

第四届传智杯初赛(莲子的机械动力学)

题目描述 题目背景的问题可以转化为如下描述&#xff1a; 给定两个长度分别为 n,m 的整数 a,b&#xff0c;计算它们的和。 但是要注意的是&#xff0c;这里的 a,b 采用了某种特殊的进制表示法。最终的结果也会采用该种表示法。具体而言&#xff0c;从低位往高位数起&#xf…

【linux】yum安装时: Couldn‘t resolve host name for XXXXX

yum 安装 sysstat 报错了&#xff1a; Kylin Linux Advanced Server 10 - Os 0.0 B/s | 0 B 00:00 Errors during downloading metadata for repository ks10-adv-os:- Curl error (6): Couldnt resolve host nam…

微信小程序 长按录音+录制视频

<view class"bigCircle" bindtouchstart"start" bindtouchend"stop"><view class"smallCircle {{startVedio?onVedio:}}"><text>{{startVedio?正在录音:长按录音}}</text></view> </view> <…

排序算法:【选择排序]

一、选择排序——时间复杂度 定义&#xff1a;第一趟排序&#xff0c;从整个序列中找到最小的数&#xff0c;把它放到序列的第一个位置上&#xff0c;第二趟排序&#xff0c;再从无序区找到最小的数&#xff0c;把它放到序列的第二个位置上&#xff0c;以此类推。 也就是说&am…

微服务测试是什么?

微服务测试是一种特殊的测试类型&#xff0c;因为它涉及到多个独立的服务。以下是进行微服务测试的一般性步骤&#xff1a; 1. 确定系统架构 了解微服务架构对成功测试至关重要。确定每个微服务的职责、接口、依赖项和通信方式。了解这些信息可以帮助您更好地规划测试用例和测…

ip ssl证书怎么更换ip地址

ip ssl证书是一种数字证书&#xff0c;为只有公网ip地址的站点建立安全、加密的通信通道。它通常由权威的证书颁发机构&#xff08;CA&#xff09;颁发&#xff0c;并用于验证网站的身份和安全性。ip ssl证书的主要目的是保护敏感信息&#xff0c;如信用卡号、用户名和密码等&a…

IO部分笔记

IO 概述 IO: 存储和读取数据的解决方案 作用: 用于读写文件中的数据(可以读写文件, 或网络中的数据) IO流的分类 按流的方向: 输入流, 输出流 按操作文件类型: 字节流: 可以操作所有类型的文件 字符流: 只能操作纯文本文件 纯文本文件: windows自带的记事本打开能读懂…

react Hooks(useRef、useMemo、useCallback)实现原理

Fiber 上篇文章fiber简单理解记录了react fiber架构&#xff0c;Hooks是基于fiber链表来实现的。阅读以下内容时建议先了解react fiber。 jsx -> render function -> vdom -> fiber树 -> dom vdom 转 fiber 的过程称为 recocile。diff算法就是在recocile这个过程…