flink重温笔记(九):Flink 高级 API 开发——flink 四大基石之WaterMark(Time为核心)

Flink学习笔记

前言:今天是学习 flink 的第 9 天啦!学习了 flink 四大基石之 Time的应用—> Watermark(水印,也称水位线),主要是解决数据由于网络延迟问题,出现数据乱序或者迟到数据现象,重点学习了水位线策略机制原理和应用,以及企业级的应用场景,结合自己实验猜想和代码实践,总结了很多自己的理解和想法,希望和大家多多交流!

Tips:转码之路,溯洄从之,道阻且长!希望自己继续努力,学有所成,让华丽的分割线,成为闪耀明天的起跑线!


文章目录

  • Flink学习笔记
    • 三、Flink 高级 API 开发
      • 2. WaterMark
        • 2.1 为什么需要 WaterMark
        • 2.2 多并行度与 WaterMark
        • 2.3 KeyBy 分流与 WaterMark
        • 2.4 水印的生成策略
          • 2.4.1 内置水印生成策略
            • (1) 固定延迟生成水印
            • (2) 单调递增生成水印
          • 2.4.2 自定义水印生成策略
            • (1) ==周期性 watermark 策略==
            • (2) ==间歇性 watermark 策略==
        • 2.5 在非数据源之后使用水印 [重点]
          • 2.5.1 WaterMark 的四种使用情况
            • (1) 本来有序的 Stream中的 Watermark
            • (2) 乱序事件中的Watermark
            • (3) 并行数据流中的Watermark
            • (4) 自定义 Watermark
        • 2.6 在数据源之后使用水印 (Kafka) [重点]
          • 2.6.1 kafka 向指定分区写入数据
          • 2.6.2 水印机制消费 kafak 数据
        • 2.7 Flink 对严重迟到数据的处理

三、Flink 高级 API 开发

2. WaterMark

2.1 为什么需要 WaterMark

当 flink 以 EventTime 模式处理流数据时,它会根据数据里的时间戳来处理基于时间的算子。但是由于网络、分布式等原因,会导致数据乱序的情况。
在这里插入图片描述

结论:

  • 只要使用 event time,就必须使用 watermark,在上游指定,比如:source、map算子后。

  • Watermark 的核心本质可以理解成一个延迟触发机制。

  • 因为前面提到,数据时间 >= 窗口结束时间,触发计算,这里想要延迟触发计算,所以水印时间一般比数据事件时间少几秒

  • 水印时间 = 事件时间 - 设置的水印长度

  • 水印的功能:在不影响按照事件时间判断数据属于哪个窗口的前提下,延迟某个窗口的关闭时间,让其等待一会儿延迟数据

举例:

窗口5秒,延迟(水印)3秒,按照事件时间计算来一条数据事件时间3, 落入窗口0-5.水印时间0
来一条数据事件时间7, 落入窗口5-10,水印时间4
来一条数据事件时间4,落入窗口0-5,水印时间1
来一条数据事件时间8,落入窗口5-10,水印时间5

2.2 多并行度与 WaterMark
  • 如果并行度是 n,那么watermark 就有 n 个
  • 触发条件以线程中最小的 watermark 为准

在这里插入图片描述


2.3 KeyBy 分流与 WaterMark
  • 一个程序中有多少个水印和并行度有关,和 keyby 无关

举例:

比如有单词hadoop spark
按照keyby,会分成hadoop组 和spark组
但是这两个组是共用1个水印的
hadoop来的数据满足了触发条件,会将spark组的数据也触发

2.4 水印的生成策略
2.4.1 内置水印生成策略
(1) 固定延迟生成水印

简介:设置最大延迟时间

例子:

DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)));
(2) 单调递增生成水印

简介:当前时间戳就可以充当 watermark,因为后续到达数据的时间戳不会比当前的小(网络延迟)。

例子:

DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
2.4.2 自定义水印生成策略
(1) 周期性 watermark 策略
  • 升序watermark:单调递增生成水印
  • 乱序watermark:固定延迟生成水印

都是基于周期性生成,默认的周期是 200ms,一般不去改,保持在 ms 级别 onPeriodicEmit()

(2) 间歇性 watermark 策略
  • 每一个事件时间都会产生一个watermark

2.5 在非数据源之后使用水印 [重点]
2.5.1 WaterMark 的四种使用情况
(1) 本来有序的 Stream中的 Watermark

例子:以 java bean 的数据输入作为有序事件时间

package cn.itcast.day09.WaterMark;/*** @author lql* @time 2024-03-01 21:11:00* @description TODO*/import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;/*** 使用单调递增水印,解决数据有序的场景(大多数情况都是乱序的数据,因此该场景比较少见)* 需求:从socket接受数据,进行转换操作,然后应用窗口每隔5秒生成一个窗口,使用水印时间触发窗口计算** 使用水印的前提:* 1:数据必须要携带事件时间* 2:指定事件时间作为数据处理的时间* 3:指定并行度为1* 4:使用之前版本的api的时候,需要增加时间类型的代码** 测试数据:* sensor_1,1547718199,35       -》2019-01-17 17:43:19* sensor_6,1547718201,15       -》2019-01-17 17:43:21* sensor_6,1547718205,15       -》2019-01-17 17:43:25* sensor_6,1547718210,15       -》2019-01-17 17:43:30** todo 如果窗口销毁以后,有延迟数据的到达会被丢弃,无法再次触发窗口的计算了*/
public class MonotonousWaterMark {public static void main(String[] args) throws Exception {//todo 1)创建flink流处理的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//设置 Flink 程序中流数据时间语义为 EventTime。// 在处理数据时 Flink 程序会按照数据事件发生的时间进行处理,而不是按照数据到达 Flink 程序的时间进行处理。env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//todo 2) 接入数据源SingleOutputStreamOperator<WaterSensor> lines = env.socketTextStream("node1", 9999).map(new MapFunction<String, WaterSensor>() {@Overridepublic WaterSensor map(String value) throws Exception {String[] data = value.split(",");return new WaterSensor(data[0], Long.valueOf(data[1]), Integer.parseInt(data[2]));}});//todo 3)添加水印处理//flink1.12之前版本的api编写(单调递增水印本质上还是周期性水印)SingleOutputStreamOperator<WaterSensor> waterMarkStream = lines.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<WaterSensor>() {@Overridepublic long extractAscendingTimestamp(WaterSensor element) {// 因为我们在转换时间戳,需要毫秒级别!return element.getTs()*1000L;}});waterMarkStream.print("数据>>>");//todo 4)应用窗口操作,设置窗口长度为5秒WindowedStream<WaterSensor, String, TimeWindow> sensorWS = waterMarkStream.keyBy(sensor -> sensor.getId()).window(TumblingEventTimeWindows.of(Time.seconds(5)));//todo 5)定义窗口函数SingleOutputStreamOperator<String> result = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {out.collect("key" + s + "\n" +"数据为" + elements + "\n" +"数据条数为" + elements.spliterator().estimateSize() + "\n" +"窗口时间为" + context.window().getStart() + "->" + context.window().getEnd());}});//todo 6)输出测试result.print();//todo 启动运行env.execute();}/*** 水位传感器,用来接受水位数据*/@Data@AllArgsConstructor@NoArgsConstructorprivate static class WaterSensor {private String id;  //传感器idprivate long ts;    //时间private Integer vc; //水位}
}

注意:flink 1.12 版本之后的有序流添加周期水印

//注意:下面的代码使用的是Flink1.12中新的API
SingleOutputStreamOperator<WaterSensor> sensorDS = lines//TODO 有序流中的watermark.assignTimestampsAndWatermarks(//指定watermark生成(单调递增)WatermarkStrategy.<WaterSensor>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {//指定如何从数据提取时间戳return element.getTs() * 1000L;
}));

结果:

情况一:一种类别输入
sensor_6,1547718201,15       -2019-01-17 17:43:21
sensor_6,1547718205,15       -2019-01-17 17:43:25
sensor_6,1547718210,15       -2019-01-17 17:43:30输出:
数据>>>> MonotonousWaterMark.WaterSensor(id=sensor_6, ts=1547718201, vc=15)
数据>>>> MonotonousWaterMark.WaterSensor(id=sensor_6, ts=1547718205, vc=15)
keysensor_6
数据为[MonotonousWaterMark.WaterSensor(id=sensor_6, ts=1547718201, vc=15)]
数据条数为12019-01-17 17:43:20 - > 2019-01-17 17:43:25)数据>>>> MonotonousWaterMark.WaterSensor(id=sensor_6, ts=1547718210, vc=15)
keysensor_6
数据为[MonotonousWaterMark.WaterSensor(id=sensor_6, ts=1547718205, vc=15)]
数据条数为1
窗口时间为1547718205000->15477182100002019-01-17 17:43:25 - > 2019-01-17 17:43:30
情况二:两种类别输入
sensor_1,1547718199,35       -2019-01-17 17:43:19
sensor_6,1547718201,15       -2019-01-17 17:43:21
sensor_6,1547718205,15       -2019-01-17 17:43:25
sensor_6,1547718210,15       -2019-01-17 17:43:30输出:
数据>>>> MonotonousWaterMark.WaterSensor(id=sensor_1, ts=1547718199, vc=35)
数据>>>> MonotonousWaterMark.WaterSensor(id=sensor_6, ts=1547718201, vc=15)
数据>>>> MonotonousWaterMark.WaterSensor(id=sensor_6, ts=1547718205, vc=15)
keysensor_1
数据为[MonotonousWaterMark.WaterSensor(id=sensor_1, ts=1547718199, vc=35)]
数据条数为1
窗口时间为1547718195000->15477182000002019-01-17 17:43:15 - > 2019-01-17 17:43:20)keysensor_6
数据为[MonotonousWaterMark.WaterSensor(id=sensor_6, ts=1547718201, vc=15)]
数据条数为1
窗口时间为1547718200000->15477182050002019-01-17 17:43:20 - > 2019-01-17 17:43:25)数据>>>> MonotonousWaterMark.WaterSensor(id=sensor_6, ts=1547718210, vc=15)
keysensor_6
数据为[MonotonousWaterMark.WaterSensor(id=sensor_6, ts=1547718205, vc=15)]
数据条数为1
窗口时间为1547718205000->15477182100002019-01-17 17:43:25 - > 2019-01-17 17:43:30

总结:

  • 1- 体现窗口左闭右开思想(即右端时间重合的数据不参与计算)
  • 2- 有序数据的水印窗口标准开始时间 :时间戳(秒级)// 窗口长度 * 窗口长度 * 1000 (这里的整除可以去掉余数
// 如果是秒级,而不是时间戳:
1)start = timestamp - (timestamp - offset + windowSize) % windowSize; ​事件时间 - (事件时间 - 0 + 窗口大小)%窗口大小 ​​​​​​​​​时间戳按照窗口长度 取整数倍(以1970110点为起点 => 伦敦时间) ​2)end = start + size ​​​​​​​​ 开始时间 + 窗口长度3)左闭右开: 属于本窗口的最大时间戳 = end -1ms , 所以时间为 end这条数据,不属于本窗口,所以是开区间
  • 3- 有序数据的水印窗口标准结束时间 :标准开始时间 + 窗口长度

  • 4- 此时水位线的变化和事件时间保持一致(因为是有序时间,就不需设置延迟,那么 t 就是 0。

    ​ watermark = maxtime - 0 = maxtime)

  • 5- 环境并行度设置为 1,方便观察现象

  • 6- flink 1.12 之前版本,需要指定事件时间,env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  • 7- 转换时间戳时需要毫秒级别

  • 8- window().getStart() 获取窗口标准开始时间,window().getEnd()获取窗口标准结束时间

  • 9- spliterator().estimateSize() 获取窗口内数据条数

  • 10- api版本区别:

    • flink1.12之前:调用 assignTimestampAndwatermarks,new 一个 AscendingTimestampExtractor,重写方法获取时间戳
    • flink1.12之后:调用 assignTimestampAndWatermarks,有序流回调本质周期水印策略
      • WatermarkStrategy.forMonotonousTimestamps.withTimestampAssigner
      • new 一个序列化 SerializableTimestampAssigner,重写方法获取时间戳
  • 11- 应用场景:周期水印解决数据有序场景


(2) 乱序事件中的Watermark

例子:以 java bean 的数据输入作为乱序事件时间

package cn.itcast.day09.WaterMark;/*** @author lql* @time 2024-03-02 15:20:38* @description TODO:*/import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;/*** 使用固定延迟水印,解决数据乱序的场景(大多数情况都是乱序的数据,使用比较多)* 需求:从socket接受数据,进行转换操作,然后应用窗口每隔5秒生成一个窗口,使用水印时间触发窗口计算** 使用水印的前提:* 1:数据必须要携带事件时间* 2:指定事件时间作为数据处理的时间* 3:指定并行度为1* 4:使用之前版本的api的时候,需要增加时间类型的代码** 测试数据:* sensor_1,1547718199,35       -》2019-01-17 17:43:19* sensor_6,1547718201,15       -》2019-01-17 17:43:21* sensor_6,1547718205,15       -》2019-01-17 17:43:25* sensor_6,1547718210,15       -》2019-01-17 17:43:30** todo 固定延迟触发,根据事件时间-最大乱序时间-1得到水印,根据水印时间作为触发窗口的条件* 触发窗口计算的两个条件:* 1:时间达到窗口的endtime* 2:窗口中存在数据*/
public class OutOfOrdernessWaterMark {public static void main(String[] args) throws Exception {// todo 1) 设置流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// todo 2) 数据源SingleOutputStreamOperator<WaterSensor> lines = env.socketTextStream("node1", 9999).map(new MapFunction<String, WaterSensor>() {@Overridepublic WaterSensor map(String value) throws Exception {String[] data = value.split(",");return new WaterSensor(data[0], Long.valueOf(data[1]), Integer.parseInt(data[2]));}});// todo 3) 设置水印//flink1.12之前版本的api编写SingleOutputStreamOperator<WaterSensor> waterMarkStream = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<WaterSensor>(Time.seconds(3)) {@Overridepublic long extractTimestamp(WaterSensor element) {return element.getTs() * 1000L;}});waterMarkStream.print("数据>>>");//todo 4)应用窗口操作WindowedStream<WaterSensor, String, TimeWindow> sensorWS = waterMarkStream.keyBy(t -> t.getId()).window(TumblingEventTimeWindows.of(Time.seconds(5)));//todo 5) 自定义窗口SingleOutputStreamOperator<String> result = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> collector) throws Exception {collector.collect("key: " + s + "\n" +"数据为: " + elements + "\n" +"条数为:" + elements.spliterator().estimateSize() + "\n" +"时间窗口为:" + context.window().getStart() + "->" + context.window().getEnd() + "\n");}});//todo 6) 打印操作result.print();//todo 7) 启动程序env.execute();}/*** 水位传感器,用来接受水位数据*/@Data@AllArgsConstructor@NoArgsConstructorprivate static class WaterSensor {private String id;  //传感器idprivate long ts;    //时间private Integer vc; //水位}
}

注意:flink 1.12 版本之后的无序流添加固定延迟水印

SingleOutputStreamOperator<WaterSensor> waterMarkStream = lines.assignTimestampsAndWatermarks(// 固定延迟水印,是 Duration 类型WatermarkStrategy<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor waterSensor, long l) {return waterSensor.getTs() * 1000L;}}));

结果:

情况一:一种类别输入
sensor_6,1547718201,15       -2019-01-17 17:43:21
sensor_6,1547718205,15       -2019-01-17 17:43:25
sensor_6,1547718210,15       -2019-01-17 17:43:30输出:
数据>>>> OutOfOrdernessWaterMark.WaterSensor(id=sensor_6, ts=1547718201, vc=15)
数据>>>> OutOfOrdernessWaterMark.WaterSensor(id=sensor_6, ts=1547718205, vc=15)
数据>>>> OutOfOrdernessWaterMark.WaterSensor(id=sensor_6, ts=1547718210, vc=15)
key: sensor_6
数据为: [OutOfOrdernessWaterMark.WaterSensor(id=sensor_6, ts=1547718201, vc=15)]
条数为:1
时间窗口为:1547718200000->15477182050002019-01-17 17:43:20 -> 2019-01-17 17:43:25

总结:

  • 1- 如果是有 key 类别的差异,触发窗口计算往往在 key 变化时,而不需要两个一样的 key 作为对照

  • 2- 因为设置了延迟,在触发窗口范围的时候,事件时间 - 延迟时间 = 水印时间

    • (例子中打印了 3 条数据,即第 3 条数据触发计算,第3条数据的水印时间的秒级:30 - 3 = 27 >= 窗口的 endTime)
    • 窗口触发两个条件,一是水印时间达到窗口 endTime,二是窗口内有数据
  • 3- api版本区别:

    • flink1.12之前:调用 assignTimestampAndWatermarks,new 一个 BoundedOutofOrdernessTimestampExtractor

      注意设置 延迟时间,重写方法获取事件时间

    • flink1.12之后:调用 assignTimestampAndWatermarks,有序流回调固定延迟水印策略

      • WatermarkStrategy.forBoundedOutOfOrderness(Duration 类型延迟时间).withTimestampAssigner
      • new 一个序列化 SerializableTimestampAssigner,重写方法获取时间戳
  • 4- 应用场景:固定延迟水印解决数据乱序场景


(3) 并行数据流中的Watermark

对齐机制会取所有 Channel 中最小的 Watermark,即:

每个并行度中必须都有数据,且都满足触发窗口条件,才会有对齐机制

例子:将并行度设置为2,带有线程号

package cn.itcast.day09.WaterMark;/*** @author lql* @time 2024-03-02 19:27:53* @description TODO:多并行度下的水印操作演示*/import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.stringtemplate.v4.ST;import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;/*** 测试数据:* 并行度设置为2测试:* hadoop,1626934802000 ->2021-07-22 14:20:02* hadoop,1626934805000 ->2021-07-22 14:20:05* hadoop,1626934806000 ->2021-07-22 14:20:06*/
public class Watermark_Parallelism {//定义打印数据的日期格式final private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");public static void main(String[] args) throws Exception {// todo 1) 流式环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// todo 2) 数据源SingleOutputStreamOperator<Tuple2<String, Long>> tupleDataStream = env.socketTextStream("node1", 9999).map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String line) throws Exception {try {String[] array = line.split(",");return Tuple2.of(array[0], Long.parseLong(array[1]));} catch (NumberFormatException e) {System.out.println("输入的数据格式不正确:" + line);return Tuple2.of("null", 0L);}}}).filter(new FilterFunction<Tuple2<String, Long>>() {@Overridepublic boolean filter(Tuple2<String, Long> tuple) throws Exception {if (!tuple.f0.equals("null") && tuple.f1 != 0L) {return true;}return false;}});// todo 3) 水印操作SingleOutputStreamOperator<Tuple2<String, Long>> waterMarkDataStream = tupleDataStream.assignTimestampsAndWatermarks(//TODO 自定义watermark生成器WatermarkStrategy.forGenerator(new WatermarkGeneratorSupplier<Tuple2<String, Long>>() {@Overridepublic WatermarkGenerator<Tuple2<String, Long>> createWatermarkGenerator(Context context) {return new MyWatermarkGenerator<>();}}).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {// 获取数据中的 eventTimeLong timestamp = element.f1;// 定义字符串并打印System.out.println("键值:" + element.f0 + ",线程号:" + Thread.currentThread().getId() + "," +"事件时间:【" + sdf.format(timestamp) + "】");return timestamp;}}));// todo 4) 分流和窗口SingleOutputStreamOperator<String> result = waterMarkDataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {@Overridepublic void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {//定义该窗口所有时间字段的集合对象ArrayList<Long> timeArr = new ArrayList<>();// 首先获取了输入数据流(input)的迭代器Iterator<Tuple2<String, Long>> iterator = input.iterator();while (iterator.hasNext()) {Tuple2<String, Long> tuple2 = iterator.next();timeArr.add(tuple2.f1);}//对保存到集合列表的时间进行排序Collections.sort(timeArr);//打印输出该窗口触发的所有数据String outputData = "" +"\n 键值:【" + tuple + "】" +"\n     触发窗口数据的个数:【" + timeArr.size() + "】" +"\n     触发窗口的数据:" + sdf.format(new Date(timeArr.get(timeArr.size() - 1))) +"\n     窗口计算的开始时间和结束时间:" + sdf.format(new Date(window.getStart())) + "----->" +sdf.format(new Date(window.getEnd()));out.collect(outputData);}});//TODO 6)打印测试result.printToErr("触发窗口计算结果>>>");//TODO 7)启动作业env.execute();}public static class MyWatermarkGenerator<T> implements WatermarkGenerator<T>{//定义变量,存储当前窗口中最大的时间戳private long maxTimestamp = -1L;/*** 每条数据都会调用该方法* @param event* @param eventTimestamp* @param output*/@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {//System.out.println("on Event...");maxTimestamp = Math.max(maxTimestamp, eventTimestamp);}/**** 周期性的执行,默认是200ms调用一次* @param output*/@Overridepublic void onPeriodicEmit(WatermarkOutput output) {//System.out.println("on Periodic..."+System.currentTimeMillis());//发射watermarkoutput.emitWatermark(new Watermark(maxTimestamp -1L));}}
}

结果:

输入:* hadoop,1626934802000 ->2021-07-22 14:20:02* hadoop,1626934805000 ->2021-07-22 14:20:05* hadoop,1626934806000 ->2021-07-22 14:20:06输出:
键值:hadoop,线程号:68,事件时间:【2021-07-22 14:20:02.000】
键值:hadoop,线程号:69,事件时间:【2021-07-22 14:20:05.000】
键值:hadoop,线程号:68,事件时间:【2021-07-22 14:20:06.000】
触发窗口计算结果>>>:2> 
键值:【(hadoop)】触发窗口数据的个数:【1】触发窗口的数据:2021-07-22 14:20:02.000窗口计算的开始时间和结束时间:2021-07-22 14:20:00.000----->2021-07-22 14:20:05.000

总结:

  • 1- 获取线程号:Thread.currentThread().getId()
  • 2- 自定义日期格式:new SimpleDateFormat()
  • 3- 看到 input 是 Iterate 类型,需要调用 iterator()方法转化为迭代对象,运用 while 循环结合 hashNext()边迭代边加入列表
  • 4- Collections.sort(列表),可以对列表进行排序

(4) 自定义 Watermark

A. 周期性水印

package cn.itcast.day09.WaterMark;/*** @author lql* @time 2024-03-02 17:07:17* @description TODO*/import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;/*** 自定义周期性水印,内置水印:固定延迟水印和单调递增水印都是基于周期性水印开发的,默认是200ms生成一次watermark*/
public class WaterMark_Periodic {public static void main(String[] args) throws Exception {// todo 1) 设置流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// todo 2) 数据源SingleOutputStreamOperator<WaterSensor> lines = env.socketTextStream("node1", 9999).map(new MapFunction<String, WaterSensor>() {@Overridepublic WaterSensor map(String value) throws Exception {String[] data = value.split(",");return new WaterSensor(data[0], Long.valueOf(data[1]), Integer.parseInt(data[2]));}});// todo 3) 设置水印操作SingleOutputStreamOperator<WaterSensor> sensorDS  = lines.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new WatermarkGeneratorSupplier<WaterSensor>() {@Overridepublic WatermarkGenerator<WaterSensor> createWatermarkGenerator(Context context) {return new MyWatermarkGenerator<>();}}).withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor waterSensor, long l) {return waterSensor.getTs() * 1000L;}}));// todo 4) 分组KeyedStream<WaterSensor, String> sensorKS  = sensorDS.keyBy(t -> t.getId());// todo 5) 开窗WindowedStream<WaterSensor, String, TimeWindow> sensorWS  = sensorKS.window(TumblingEventTimeWindows.of(Time.seconds(10)));// todo 6) 自定义窗口SingleOutputStreamOperator<String> result = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> iterable, Collector<String> out) throws Exception {out.collect("key: " + s + "\n" +"数据为: " + iterable + "\n" +"条数为:" + iterable.spliterator().estimateSize() + "\n" +"时间窗口为:" + context.window().getStart() + "->" + context.window().getEnd() + "\n");}});// todo 7) 打印和启动result.print();env.execute();}/*** 水位传感器,用来接受水位数据*/@Data@AllArgsConstructor@NoArgsConstructorprivate static class WaterSensor {private String id;  //传感器idprivate long ts;    //时间private Integer vc; //水位}private static class MyWatermarkGenerator<T> implements WatermarkGenerator<T> {private long maxTimestamp = -1L;/*** 每条数据执行一次* @param event* @param eventTimestamp* @param watermarkOutput*/@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput watermarkOutput) {System.out.println("onEvent……");maxTimestamp = Math.max(eventTimestamp, maxTimestamp);}/*** 周期性执行一次* @param watermarkOutput*/@Overridepublic void onPeriodicEmit(WatermarkOutput watermarkOutput) {System.out.println("onPeriodicEmit……"+ +System.currentTimeMillis());// 发生水印watermarkOutput.emitWatermark(new Watermark(maxTimestamp));}}
}

结果:

onPeriodicEmit……1709376044007
onPeriodicEmit……1709376044214
onPeriodicEmit……1709376044415
onPeriodicEmit……1709376044631
onPeriodicEmit……1709376044834

总结:

  • 1- 自定义水印:WatermarkStrategy.forGenerator(new WatermarkGeneratorSupplier
    • 重写方法,返回新的 Class<>()
    • 继承 WatermarkGenerator ,重写两个方法,一个每条数据执行一次,一个周期执行一次(默认是200ms)
  • 2- 更改执行周期:env.getConfig().setAutoWatermarkInterval(2000)
  • 3- 调用易出错:forGenerate 有 withTimestampAssigner 方法

B. 间歇性水印:

  • 在上述自定义周期性水印方法的 onEvent 中实现 onPeriodicEmit 中的生成水印代码即可实现
watermarkOutput.emitWatermark(new Watermark(maxTimestamp));
2.6 在数据源之后使用水印 (Kafka) [重点]
2.6.1 kafka 向指定分区写入数据
package cn.itcast.day09.watermark.kafka;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.internals.Topic;import java.util.Properties;/*** kafka生产者工具类,模拟数据的生成,将数据写入到指定的分区中** 第一个分区写入:1000,hadoop、7000,hadoop-》没有触发窗口计算* 第二个分区写入:7000,flink              -》触发了窗口计算*/
public class KafkaMock {private final KafkaProducer<String, String> producer;public final static String TOPIC = "test3";private KafkaMock(){Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092");props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.RETRIES_CONFIG, 0);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");producer = new KafkaProducer<String, String>(props);}public void producer(){long timestamp = 1000;String value = "hadoop";String key = String.valueOf(value);String data = String.format("%s,%s", timestamp, value);producer.send(new ProducerRecord<String, String>(TOPIC, 1, key, data));producer.close();}public static void main(String[] args) {new KafkaMock().producer();}
}
2.6.2 水印机制消费 kafak 数据
package cn.itcast.day09.watermark.kafka;import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import scala.collection.convert.Wrappers;import java.time.Duration;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.TimeUnit;/*** 使用水印消费kafka里面的数据*/
public class WatermarkTest {public static void main(String[] args) throws Exception {//todo 1)初始化flink流处理环境Configuration configuration = new Configuration();configuration.setInteger("rest.port", 8081);//设置webui的端口号StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);env.setParallelism(2);env.enableCheckpointing(5000);//todo 2)接入数据源//指定topic的名称String topicName = "test3";//实例化kafkaConsumer对象Properties props = new Properties();props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092");props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test001");props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000");props.setProperty("flink.partition-discovery.interval-millis", "5000");//开启一个后台线程每隔5s检测一次kafka的分区情况FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>(topicName, new SimpleStringSchema(), props);kafkaSource.setCommitOffsetsOnCheckpoints(true);//todo 在开启checkpoint以后,offset的递交会随着checkpoint的成功而递交,从而实现一致性语义,默认就是trueDataStreamSource<String> kafkaDS = env.addSource(kafkaSource);//在数据源上添加水印SingleOutputStreamOperator<String> watermarkStream = kafkaDS.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new TimestampAssignerSupplier<String>() {@Overridepublic TimestampAssigner<String> createTimestampAssigner(Context context) {return new TimestampAssigner<String>() {@Overridepublic long extractTimestamp(String element, long recordTimestamp) {return Long.parseLong(element.split(",")[0]);}};}}).withIdleness(Duration.ofSeconds(60)));//todo 3)单词计数操作SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = watermarkStream.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String value) throws Exception {return new Tuple2<String, Long>(value.split(",")[1], 1L);}});//todo 4)单词分组操作wordAndOne.keyBy(x-> x.f0).window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS))).process(new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {long sum = 0L;Iterator<Tuple2<String, Long>> iterator = elements.iterator();while (iterator.hasNext()){Tuple2<String, Long> tuple2 = iterator.next();System.out.println(tuple2.f0);sum += tuple2.f1;}out.collect(s + ","+sum);}}).print();env.execute();//todo 6)启动作业env.execute();}
}

结果1:没加 withIdleness

输入:* 第一个分区写入:1000,hadoop、7000,hadoop-》没有触发窗口计算* 第二个分区写入:7000,flink              -》触发了窗口计算

结果2:加上 withIdleness

输入:* 第一个分区写入:1000,hadoop、7000,hadoop-30s 后触发窗口计算

结论:

  • 1- 当某一个分区的触发机制达到的时候,其他的分区触发机制迟迟未触发的时候,无法触发机制
  • 2- withIdleness(Duration.ofSeconds(30)),允许 30s 等待其他分区触发计算,如果还没有触发,直接计算该分区
  • 3- 工作中一般设置 1 - 10分钟
  • 4- kafka 数据源添加水印,withTimestampAssigner 需要 new 一个 TimestampAssignerSupplier (第一次出现

2.7 Flink 对严重迟到数据的处理

例子:延迟数据处理机制设计

package cn.itcast.day09.WaterMark;/*** @author lql* @time 2024-03-03 13:11:44* @description TODO*/import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.time.Duration;/*** flink默认情况下会将迟到的数据丢弃,但是对于绝大多数的业务中是不允许删除迟到数据的,因此可以使用flink的延迟数据处理机制进行数据的获取并处理*/
public class LatenessDataDemo {public static void main(String[] args) throws Exception {// 设置环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 数据源DataStreamSource<String> lines = env.socketTextStream("node1", 9999);SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String value) throws Exception {String[] data = value.split(",");return new Tuple2<String, Long>(data[0], Long.parseLong(data[1]));}});// 水印操作 -> 水印3秒SingleOutputStreamOperator<Tuple2<String, Long>> watermarkStream = wordAndOne.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {// 报错地方:因为我们的数据源已经是毫秒级别了,就不需要转换 *1000L哦!return element.f1;}}));// 窗口操作 -> 5秒窗口// todo 1. 设置允许延迟的时间是通过allowedLateness(lateness: Time)设置WindowedStream<Tuple2<String, Long>, String, TimeWindow> windowStream = watermarkStream.keyBy(t -> t.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(2));// todo 2.初始化延迟到达的数据对象OutputTag<Tuple2<String,Long>> outputTag = new OutputTag<>("side output",TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {}));// todo 3.保存延迟到达的数据WindowedStream<Tuple2<String, Long>, String, TimeWindow> sideOutputLateData = windowStream.sideOutputLateData(outputTag);// 数据聚合SingleOutputStreamOperator<Tuple2<String, Long>> result = sideOutputLateData.apply(new WindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, String, TimeWindow>() {@Overridepublic void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<Tuple2<String, Long>> out) throws Exception {String key = null;Long counter = 0L;for (Tuple2<String, Long> element : input) {key = element.f0;counter += 1;}out.collect(Tuple2.of(key, counter));}});result.print("正常到达的数据>>>");// todo 4.获取延迟到达的数据DataStream<Tuple2<String, Long>> sideOutput = result.getSideOutput(outputTag);sideOutput.printToErr("延迟到达的数据>>>");env.execute();}
}

结果:

 /** 每5s一个窗口,水印:3s,延迟等待:2s* 测试数据:* hadoop,1626936202000  -> 2021-07-22 14:43:22 第一个窗口的数据* hadoop,1626936207000  -> 2021-07-22 14:43:27 因为设置了水印,所以不会触发窗口计算* hadoop,1626936202000  -> 2021-07-22 14:43:22 第一个窗口的数据* hadoop,1626936203000  -> 2021-07-22 14:43:23 第一个窗口的数据* hadoop,1626936208000  -> 2021-07-22 14:43:28 触发了窗口计算(hadoop,3),水印时间满足窗口endtime** ====================事件时间 28 秒 -> 水印时间 25 秒 刚好临界 endtime =======================* ===============延迟 2s 等待机制:延迟到事件时间 30s 即 水印时间 27s 关闭第一个窗口===============** 第一个窗口时间 2021-07-22 14:43:20 -> 2021-07-22 14:43:25** hadoop,1626936202000  -> 2021-07-22 14:43:22 已经触发过计算的窗口再次有新数据到达,(hadoop,4)(数据重复计算)* hadoop,1626936203000  -> 2021-07-22 14:43:23 已经触发过计算的窗口再次有新数据到达,(hadoop,5)* hadoop,1626936209000  -> 2021-07-22 14:43:29 虽然 水印时间达到endtime,但是窗口里面没有新数据,不触发计算* hadoop,1626936202000  -> 2020-07-22 14:43:22 已经触发过计算的窗口再次有新数据到达,(hadoop,6)* hadoop,1626936210000  -> 2021-07-22 14:43:30 满足了窗口销毁的条件,开始专注于第二个新窗口** 第二个窗口时间 2021-07-22 14:43:25 -> 2021-07-22 14:43:30* * ====================事件时间 33 秒 -> 水印时间 30 秒 刚好临界 endtime ====================================*  * ===============延迟 2s 等待机制:延迟到事件时间 35s 即 水印时间 32s 关闭第二个窗口===============** hadoop,1626936202000  -> 2021-07-22 14:43:22 打印迟到数据,(hadoop,1626936202000)* hadoop,1626936215000  -> 2021-07-22 14:43:35 达到水印时间触发窗口计算:(hadoop,3),之前27,28,29秒的数据*/

总结:

  • 1- 设计允许迟到数据时间:

    在水印策略后面加上:allowedLateness(Times.seconds())

  • 2- 初始化迟到的数据对象:

    new OutputTag<>(id名字,TypeInformation.of(new TypeHint<迟到数据类型>(){ }))

  • 3- 保存延迟到达迟到数据:

    窗口流.sideOutputLateData(初始化对象)

  • 4- 获取延迟到达迟到数据:

    结果流.getSideOutput(初始化对象)

  • 5- 测输出流是之前 Window Function API 中的重要算子

    OutputTag(注意复习!)

思考:

  • allowedLateness(Times.seconds) 设计允许迟到时间和

    withIdleness(Duration.ofSeconds(30)) 设计允许等待触发时间有什么不同呢?

回答:

  • (1) 从概念上看,allowedLateness 是延迟窗口关闭,不影响触发时间,而 withIdleness 等待分区一段时间,等不到就触发
  • (2) 从应用来看,allowedLateness 适用于车联网入隧道一段时间没上报数据等待数据,而 withIdleness 适用于分区木桶原理等待数据,等不到数据就单独分区触发计算。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/716607.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

通过大语言模型理解运维故障:评估和总结

张圣林 南开大学软件学院副教授、博士生导师 第六届CCF国际AIOps挑战赛程序委员会主席 在ATC、WWW、VLDB、KDD、SIGMETRICS等国际会议和JSAC、TC、TSC等国际期刊发表高水平论文50余篇。主持国家自然科学基金项目2项&#xff0c;横向项目13项&#xff08;与华为、字节跳动、腾讯…

Sqli-labs靶场第20关详解[Sqli-labs-less-20]自动化注入-SQLmap工具注入

Sqli-labs-Less-20 通过测试发现&#xff0c;在登录界面没有注入点&#xff0c;通过已知账号密码admin&#xff0c;admin进行登录发现&#xff1a; 登录后会有记录 Cookie 值 设想如果在Cookie尝试加上注入语句&#xff08;报错注入&#xff09;&#xff0c;测试是否会执行…

C++STL之vector

vector 1. vector介绍 vector文档vector其实就是一个顺序表&#xff0c;它表示可变大小数组的序列容器。像数组一样&#xff0c;可以使用下标[] 来访问vector的元素&#xff0c;和数组一样高效&#xff1b;甚至&#xff0c;它的大小是可以动态改变的&#xff0c;其大小由容器自…

软考55-上午题-【数据库】-数据库设计步骤1

一、数据库设计的步骤 新奥尔良法&#xff0c;四个主要阶段&#xff1a; 1、用户需求分析&#xff1a;手机用户需求&#xff0c;确定系统边界&#xff1b; 2、概念设计&#xff08;概念结构设计&#xff09;&#xff1a;是抽象概念模型&#xff0c;较理想的是采用E-R方法。 …

代码随想录算法训练营第七天

● 自己看到题目的第一想法 第454题.四数相加II 方法&#xff1a; 方法一&#xff1a; 暴力法 思路&#xff1a; 注意&#xff1a; 代码&#xff1a; class Solution { public:int fourSumCount(vector<int>& nums1, vector<int>& nums2, vector<i…

QT 网络编程 8

1 基础知识 udp tcp 2 UDP 框架 客户端: QUdpSocket x; qint64 writeDatagram( const char *data, qint64 size, const QHostAddress &address, quint16 port );服务器: void Server::initSocket(){udpSocket new QUdpSocket(this);udpSocket->bind(QHostAddress…

macos jupyter notebook字体的修改

终端codemirror 记事本打开 搜索font-family 修改font-size保存即可

重学SpringBoot3-@ConditionalOnXxx条件注解

重学SpringBoot3-ConditionalOnXxx条件注解 引言常见的条件注解常见的条件注解示例扩展条件注解1. ConditionalOnJndi2. ConditionalOnJava3. ConditionalOnCloudPlatform4. ConditionalOnEnabledResourceChain5. 自定义条件注解 总结 引言 Spring Boot 提供了一组强大的条件注…

ERDAS监督分类与温度反演教程

本期带来监督分类教程&#xff0c;更多内容&#xff0c;欢迎关注小编的公众号梧桐凉月哦&#xff01;&#xff01;&#xff01; 一、研究区自然、地理环境特征&#xff1a; 1、景德镇市位于中国江西省东北部&#xff0c;地处赣江中游的赣北盆地&#xff0c;地形地貌以丘陵和低…

mitmproxy代理

文章目录 mitmproxy1. 网络代理2. 安装3. Https请求3.1 启动mitmproxy3.2 获取证书3.3 配置代理3.4 运行测试 4. 请求4.1 读取请求4.2 修改请求4.3 拦截请求 5. 响应5.1 读取响应5.2 修改响应 6. 案例&#xff1a;共享账号6.1 登录bilibili获取cookies6.2 在代理请求中设置cook…

ER-NeRF实时对话数字人模型训练与部署

ER-NeRF是基于NeRF用于生成数字人的方法&#xff0c;可以达到实时生成的效果。 下载源码 cd D:\Projects\ git clone https://github.com/Fictionarry/ER-NeRF cd D:\Projects\ER-NeRF 下载模型 准备面部解析模型 wget https://github.com/YudongGuo/AD-NeRF/blob/master/…

MyBatisPlus入门教程

MyBatisPlus MyBatis-Plus (opens new window)&#xff08;简称 MP&#xff09;是一个 MyBatis (opens new window) 的增强工具&#xff0c;在 MyBatis 的基础上只做增强不做改变&#xff0c;为简化开发、提高效率而生。 官网地址&#xff1a;https://baomidou.com/ 一、入门案…

sql注入之sqli-labs-less-1 错误注入

输入?id1 得到登录页面&#xff1a; 通过order by 函数试探&#xff1a; 5的时候报错 试探到3 的时候返回正确的值&#xff1a; 然后继续注入&#xff1a;?id -1 union select 1,2,3 -- 查看回显点&#xff1a; 开始查看数据库内容&#xff1a;id-1 union select 1,databa…

open-spider开源爬虫工具:抖音数据采集

在当今信息爆炸的时代&#xff0c;网络爬虫作为一种自动化的数据收集工具&#xff0c;其重要性不言而喻。它能够帮助我们从互联网上高效地提取和处理数据&#xff0c;为数据分析、市场研究、内容监控等领域提供支持。抖音作为一个全球性的短视频平台&#xff0c;拥有海量的用户…

CKA考生注意:这些Deployment要点能助你一臂之力!

往期精彩文章 : 提升CKA考试胜算&#xff1a;一文带你全面了解RBAC权限控制&#xff01;揭秘高效运维&#xff1a;如何用kubectl top命令实时监控K8s资源使用情况&#xff1f;CKA认证必备&#xff1a;掌握k8s网络策略的关键要点提高CKA认证成功率&#xff0c;CKA真题中的节点维…

68-解构赋值,迭代器,生成器函数

1.解构赋值(针对数组array&#xff0c;字符串String及对象object以) 结构赋值是一种特殊的语法&#xff0c;通过将各种结构中的元素复制到变量中达到"解构"的目的&#xff0c;但是数组本身没有改变 1.1解构单层数组 <script>let arr [1,2,3,4,5];//获取数组…

c++ primer学习笔记(一)

目录 第一章、c快速入门 重点&#xff1a;类的简介 第二章 1、基本内置类型 2、字面值常量 1、整型字面值规则 2、浮点字面值规则 3、布尔字面值 4、字符字面值 5、非打印字符的转义序列 ​编辑 6、字符串字面值 3、变量 1、变量标识符 2、定义和初始化对象 3、…

java: 无法访问org.springframework.web.bind.annotation.RequestMapping......类文件具有错误的版本 61.0, 应为 52.0

文章目录 一、报错问题二、问题背景三、原因分析四、解决方案 一、报错问题 java: 无法访问org.springframework.web.bind.annotation.RequestMapping 错误的类文件: /D:/SoftwareInstall/Maven/repository/org/springframework/spring-web/6.0.9/spring-web-6.0.9.jar!/org/s…

latex报错Repeated entry解决办法

报错原因——重复了两个参考文献&#xff0c;删掉一个即可 总结 "Repeated entry"这个错误通常出现在你尝试在LaTeX中多次使用同一个标签&#xff08;label&#xff09;或者多次插入相同的图像/表格等时。例如&#xff0c;在LaTeX中&#xff0c;我们可能会为每一个章…

Modern C++ std::any为何要求Tp可拷贝构造?

小问题也会影响设计的思路&#xff0c;某个问题或某种case的探讨有助于理解设计的初衷。 声明&#xff1a;以下_Tp/Tp都是指要放入std::any的对象的类型。 它要求_Tp is_copy_constructible, 仅仅是因为有很多函数的实现调用了Tp的拷贝构造函数吗&#xff1f;比如说上节提到的初…