Flink(java版)

watermark

时间语义和 watermark

注意:数据进入flink的时间:如果用这个作为时间语义就不存在问题,但是开发中往往会用处理时间
作为时间语义这里就需要考虑延时的问题。
如上图,数据从kafka中获取出来,从多个分区中获取,这时候时间肯定有乱序,这时候就需要使用事
件时间。

场景:游戏连续过五关,给予奖励
地铁里面玩游戏,连过三关断网了,二分钟过了八关。这时候是用处理时间还是事件时间呢?
处理时间的优势:牺牲一定的数据准确性,没有延迟

package com.atguigu.apitest.window;/**import com.atguigu.apitest.beans.SensorReading;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;public class WindowTest3_EventTimeWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认为当前机器的cpu的最大核数//env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.getConfig().setAutoWatermarkInterval(100);// socket文本流DataStream<String> inputStream = env.socketTextStream("localhost", 7777);// 转换成SensorReading类型,分配时间戳和watermarkDataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));})// 乱序数据设置时间戳和watermark.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {@Overridepublic long extractTimestamp(SensorReading element) {return element.getTimestamp() * 1000L;}});OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") {};// 基于事件时间的开窗聚合,统计15秒内温度的最小值SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id").timeWindow(Time.seconds(15)).allowedLateness(Time.minutes(1)).sideOutputLateData(outputTag).minBy("temperature");minTempStream.print("minTemp");minTempStream.getSideOutput(outputTag).print("late");env.execute();}
}
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,36.3
sensor_1,1547718211,32.8
sensor_1,1547718212,37.1注意:第一个窗口是[1547718195,1547718210);

sensor_1,1547718213,33
sensor_1,1547718224,32.1
sensor_1,1547718225,31.6
sensor_1,1547718226,21.2
sensor_1,1547718227,33.6第二个窗口大小:第一个窗口是[1547718210,1547718225);

1.理想状态:来一条数据处理一条,每条数据代表对时间推进;如图到5之后就将【0,5)的窗口关闭并输出;2.乱序状态:原因:网络延迟、分布式、分区导致乱序数据产生;网络延迟和分布式处理造成的乱序都是几十毫秒和几百毫秒的范围的差距;这将回造成大多数延迟数据集中在几十毫秒和几百毫秒的范围内;3.解决方案:将时间事件放慢

flink的三重保证:1.设置watermaker将几百毫秒的数据全部输出;2.先输出一个近似的结果,但是不要关闭窗口后面延迟的时间还需要更新;3.当延时时间到了,窗口就关闭了;兜底方案使用侧输出流保证数据不丢失;注意:数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据都已经到
达了,因此,window 的执行也是由 Watermark 触发的。
6 3 2 5 4 1 
比如设置3秒的watermaker:
到达5:说明2秒之前的数据都到齐了,后面2,3都可以输出
到达6:说明3秒之前的数据都到齐了,大于等于3秒的数据才能输出意义:watermark 用来让程序自己平衡延迟和结果正确性:如果设置太大延迟太高,设置太
小数据就不准确,需要通过具体的业务场景去平衡这个值;

watermark 用来让程序自己平衡延迟和结果正确性:如果设置太大延迟太高,设置太小,乱序数据
没有搞定,数据就不准确,需要通过具体的业务场景去平衡这个值;如何找到watermaker:首先要了解乱序程度;
解决方案:通过机器学习构建一个模型,构建当前业务模型中的延迟状态的分布情况;

如图:大部分的延时数据都20ms和80ms之间的范围中,这时候设置80ms就搞定大部分乱序数据;
这时候还有很少的数据,如果对数据准确性要求比较高,这时候就需要设置窗口迟到机制去保证
数据的准备性;最后还有网络延迟的数据还是没有输出这时候就需要添加侧输出流作为兜底方案。

 watermark 生成问题

默认:来一条生产一条watermaker,如果短时间数据量比较大,会造成watermaker都一样造成资
源浪费;周期性添加watermaker:每隔一段时间更新一下watermaker 
周期性时间缺点:实时性不好;数据过于分散会造成资源浪费;如何选择:看数据的分布,过于集中使用周期性生成模式,数据稀疏,使用默认的模型;

状态编程 

需求:我们可以利用 Keyed state,实现这样一个需求: 检测传感器的温度值,如果连续的两个温度差值超过 10 度,就输出报警

package com.atguigu.apitest.state;/*** Copyright (c) 2018-2028 尚硅谷 All Rights Reserved* <p>* Project: FlinkTutorial* Package: com.atguigu.apitest.state* Version: 1.0* <p>* Created by wushengran on 2020/11/10 16:33*/import com.atguigu.apitest.beans.SensorReading;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @ClassName: StateTest3_KeyedStateApplicationCase* @Description:* @Author: wushengran on 2020/11/10 16:33* @Version: 1.0*/
public class StateTest3_KeyedStateApplicationCase {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// socket文本流DataStream<String> inputStream = env.socketTextStream("localhost", 7777);// 转换成SensorReading类型DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});// 定义一个flatmap操作,检测温度跳变,输出报警SingleOutputStreamOperator<Tuple3<String, Double, Double>> resultStream = dataStream.keyBy("id").flatMap(new TempChangeWarning(10.0));resultStream.print();env.execute();}// 实现自定义函数类public static class TempChangeWarning extends RichFlatMapFunction<SensorReading, Tuple3<String, Double, Double>>{// 私有属性,温度跳变阈值private Double threshold;public TempChangeWarning(Double threshold) {this.threshold = threshold;}// 定义状态,保存上一次的温度值private ValueState<Double> lastTempState;@Overridepublic void open(Configuration parameters) throws Exception {lastTempState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("last-temp", Double.class));}@Overridepublic void flatMap(SensorReading value, Collector<Tuple3<String, Double, Double>> out) throws Exception {// 获取状态Double lastTemp = lastTempState.value();// 如果状态不为null,那么就判断两次温度差值if( lastTemp != null ){Double diff = Math.abs( value.getTemperature() - lastTemp );if( diff >= threshold )out.collect(new Tuple3<>(value.getId(), lastTemp, value.getTemperature()));}// 更新状态lastTempState.update(value.getTemperature());}@Overridepublic void close() throws Exception {lastTempState.clear();}}
}
sensor_1,1547718206,36.3
sensor_1,1547718206,37.9
sensor_1,1547718206,48
sensor_6,1547718201,15.4
sensor_6,1547718201,35
sensor_1,1547718226,36

 状态后端

状态后端: 1.本地的状态管理(如何存,上下文配置,怎么存,怎么写)  2.做快照容错,如何恢复数据

1. 测试环境:MemoryStateBackend
2. 生产环境:FsStateBackend 
3. 数据非常大时候:RocksDBStateBackend
state.backend: filesystem //默认使用FsStateBackend 
tate.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints 
//配置一个checkpoint的hdfs的存储路径jobmanager.execution.failover-strategy: region //区域化重启state.backend.incremental: false  //增量添加checkpoint

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/67804.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ubuntu 22.04安装cuda、cudnn、conda、pytorch

1、cuda 视频连接 https://www.bilibili.com/video/BV1bW4y197Mo/?spm_id_from333.999.0.0&vd_source3b42b36e44d271f58e90f86679d77db7cuda 11.8 https://developer.nvidia.com/cuda-toolkit-archive点击进入 https://developer.nvidia.com/cuda-11-8-0-download-arc…

gRPC-GateWay Swagger 实战

上一次我们分享了关于 gRPC-Gateway 快速实战 &#xff0c;可以查看地址来进行回顾 : 也可以查看关于 gRPC 的历史文章&#xff1a; gRPC介绍 gRPC 客户端调用服务端需要连接池吗&#xff1f; gRPC的拦截器 gRPC的认证 分享一下 gRPC- HTTP网关 I 今天主要是分享关于 gRPC-G…

Linux相关指令(下)

cat指令 查看目标文件的内容 常用选项&#xff1a; -b 对非空输出行编号 -n 对输出的所有行编号 -s 不输出多行空行 一个重要思想&#xff1a;linux下一切皆文件&#xff0c;如显示器文件&#xff0c;键盘文件 cat默认从键盘中读取数据再打印 退出可以ctrlc 输入重定向<…

kubernetes/cluster/addons/fluentd-elasticsearch

#发文福利# 一、前言 kubernetes 1.23搭建EFK所用到的yaml文件&#xff0c;本帖均来自kubernetes官方&#xff0c;且没做修改。 https://github.com/kubernetes/kubernetes/tree/release-1.23/cluster/addons/fluentd-elasticsearch 二、EFK 原版yaml 1、create-logging-na…

机器学习笔记之最优化理论与方法(六)无约束优化问题——最优性条件

机器学习笔记之最优化理论与方法——无约束优化问题[最优性条件] 引言无约束优化问题无约束优化问题最优解的定义 无约束优化问题的最优性条件无约束优化问题的充要条件无约束优化问题的必要条件无约束优化问题的充分条件 引言 本节将介绍无约束优化问题&#xff0c;主要介绍无…

css让元素保持等比例宽高

使用新属性 aspect-ratio: 16/9; 代码示例 <style>div {width: 60%;/* 等比例宽高 */aspect-ratio: 16/9;background-color: red;margin: auto;}</style> </head><body><div></div> </body>示例 aspect-ratio兼容性

python-wordcloud词云

导入模块 from wordcloud import WordCloud import jieba import imageio import matplotlib.pyplot as plt from PIL import ImageGrab import numpy as npwordcloud以空格为分隔符号&#xff0c;来将文本分隔成单词 PIL pillow模块 img imageio.imread(image.png)这行代码…

软件测试—测试用例的设计

软件测试—测试用例的设计 测试用例是什么&#xff1f; 首先&#xff0c;测试用例&#xff08;Test Case&#xff09;是为了实施测试而向被测试系统提供的一组集合。这组集合包括&#xff1a;测试环境、操作步骤、测试数据、预期结果等要素。 好的测试用例的特征 一个好的测试…

MySQL表的内连和外连

文章目录 MySQL表的内连和外连1. 内连接(1) 显示SMITH的名字和部门名称 2. 外连接2.1 左外连接(1) 查询所有学生的成绩&#xff0c;如果这个学生没有成绩&#xff0c;也要将学生的个人信息显示出来 2.2 右外连接(1) 对stu表和exam表联合查询&#xff0c;把所有的成绩都显示出来…

软件设计师学习笔记8-操作系统+进程

目录 1.操作系统 1.1操作系统层次图 1.2操作系统的作用 1.3操作系统的任务 2.特殊的操作系统 3.进程 3.1进程的概念 3.2进程与程序 3.3进程与线程 3.4进程的状态 3.4.1三态模型 3.4.2基于三态模型的五态模型 1.操作系统 1.1操作系统层次图 该图片来自希赛软考 1.…

zookeeper教程

zookeeper教程 zookeeper简介zookeeper的特点及数据模型zookeeper下载安装zookeeper客户端命令zookeeper配置文件zookeeper服务器常用命令zookeeper可视化管理工具zkuizookeeper集群环境搭建zookeeper选举机制使用Java原生api操作zookeeper使用java zkclient库操作zookeeper使用…

【Apollo学习笔记】——规划模块TASK之PIECEWISE_JERK_SPEED_OPTIMIZER

文章目录 前言PIECEWISE_JERK_SPEED_OPTIMIZER功能简介PIECEWISE_JERK_SPEED_OPTIMIZER相关配置PIECEWISE_JERK_SPEED_OPTIMIZER流程QP问题的标准类型定义&#xff1a;优化变量设计目标函数约束条件相关矩阵二次项系数矩阵 H H H一次项系数向量 q q q设定OSQP求解参数 Process设…

MybatisPlus 核心功能 条件构造器 自定义SQL Service接口 静态工具

MybatisPlus 快速入门 常见注解 配置_软工菜鸡的博客-CSDN博客 2.核心功能 刚才的案例中都是以id为条件的简单CRUD&#xff0c;一些复杂条件的SQL语句就要用到一些更高级的功能了。 2.1.条件构造器 除了新增以外&#xff0c;修改、删除、查询的SQL语句都需要指定where条件。因此…

深入理解联邦学习——联邦学习的定义

分类目录&#xff1a;《深入理解联邦学习》总目录 假设有两个不同的企业 A A A和 B B B&#xff0c;它们拥有不同的数据。比如&#xff0c;企业 A A A有用户特征数据&#xff0c;而企业 B B B有产品特征数据和标注数据。这两个企业按照GDPR准则是不能粗暴地把双方数据加以合并的…

孙哥Spring源码第17集

第17集 refresh()-invokeBeanFactoryPostProcessor -一-invokeBeanFactoryPostProcessor的分析过程 【视频来源于&#xff1a;B站up主孙帅suns Spring源码视频】 1、什么是解析顶级注解&#xff1f; PropertySource CompeontScan Configuration Component ImportResour…

垃圾回收 - 复制算法

GC复制算法是Marvin L.Minsky在1963年研究出来的算法。说简单点&#xff0c;就是只把某个空间的活动对象复制到其它空间&#xff0c;把原空间里的所有对象都回收掉。这是一个大胆的想法。在此&#xff0c;我们将复制活动对象的原空间称为From空间&#xff0c;将粘贴活动对象的新…

VSCode 配置 C 语言编程环境

目录 一、下载 mingw64 二、配置环境变量 三、三个配置文件 四、格式化代码 1、安装插件 2、保存时自动格式化 3、左 { 不换行 上了两年大学&#xff0c;都还没花心思去搭建 C 语言编程环境&#xff0c;惭愧&#xff0c;惭愧。 一、下载 mingw64 mingw64 是著名的 C/C…

SpringBoot整合WebSocket

流程分析 Websocket客户端与Websocket服务器端 前端浏览器和后端服务器的连接通信 HTTP与Websocket对比 服务器端编码 1.引入pom依赖 <!--webSocket--> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-sta…

【内网穿透】使用Nodejs搭建简单的HTTP服务器 ,并实现公网远程访问

目录 前言 1.安装Node.js环境 2.创建node.js服务 3. 访问node.js 服务 4.内网穿透 4.1 安装配置cpolar内网穿透 4.2 创建隧道映射本地端口 5.固定公网地址 前言 Node.js 是能够在服务器端运行 JavaScript 的开放源代码、跨平台运行环境。Node.js 由 OpenJS Foundation…

mysql(九)mysql主从复制

目录 前言概述提出问题主从复制的用途工作流程 主从复制的配置创建复制账号配置主库和从库启动主从复制从另一个服务器开始主从复制主从复制时推荐的配置sync_binloginnodb_flush_logs_at_trx_commitinnodb_support_xa1innodb_safe_binlog 主从复制的原理基于语句复制优点&…