Flink(十)【处理函数】

前言

        冬天学习成本太高了,每天冻得要死,自习室人满为患,确实是辛苦。学校基本的硬件条件差的一批(图书馆贼小贼偏僻、老教室暖气还没有地板热、空教室还得自己一个一个挨着找),个体无法改变环境只能顺应了(艹,受不了了,去校长信箱轰tnd)。

        今天学习 Flink 处理函数,学完这一块就剩状态管理、容错机制和 Flink SQL 了,坚持坚持。学完再好好回顾回顾,最后就是把剩余的一些框架(Kafka、Flume等)补齐了。

1、处理函数

        之前所介绍的流处理 API,无论是基本的转换、聚合,还是更为复杂的窗口操作,其实都是基于 DataStream 进行转换的;所以可以统称为 DataStream API,这也是 Flink 编程的核心。而我们知道,为了让代码有更强大的表现力和易用性,Flink 本身提供了多层 API,DataStream API 只是其中之一,如图:

        在更底层,我们可以不定义任何具体的算子(比如 map,filter,或者 window),而只是提炼出一个统一的“处理”(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑(我们之前可以从 process 函数中获得 上下文对象 ctx、实现侧输出流等),所以这一层接口就被叫作“处理函数”(process function)。在处理函数中,我们直面的就是数据流中最基本的元素:数据事件(event)、状态(state)以及时间(time)。这就相当于对流有了完全的控制权。处理函数比较抽象,没有具体的操作,所以对于一些常见的简单应用(比如求和、开窗口)会显得有些麻烦;不过正是因为它不限定具体做什么,所以理论上我们可以做任何事情,实现所有需求。就相当于我们 Spark 中的 RDD 编程,它是最底层的东西,所以一些上层无法实现的,它都可以实现。

        所以,总结一句话就是:只要是现有的算子实现不了的,直接上 process 即可。

1.1、基本处理函数

1.1.1、处理函数的功能和基本使用

        我们之前学习的转换算子,一般只是针对某种具体操作来定义的,能够拿到的信息比较有限。比如 map 算子,我们实现的 MapFunction 中,只能获取到当前的数据,定义它转换之后的形式;而像窗口聚合这样的复杂操作,AggregateFunction 中除数据外,还可以获取到当前的状态(以累加器 Accumulator 形式)。另外我们还介绍过富函数类,比如 RichMapFunction,它提供了获取运行时上下文的方法 getRuntimeContext(),可以拿到状态,还有并行度、任务名称之类的运行时信息。
        但是无论哪种算子,如果我们想要访问事件的时间戳,或者当前的水位线信息,都是完全做不到的。在定义生成规则之后,水位线会源源不断地产生,像数据一样在任务间流动,可我们却不能像数据一样去处理它,因为跟时间相关的操作,目前我们只会用窗口来处理。而在很多应用需求中,要求我们对时间有更精细的控制,需要能够获取水位线,甚至要“把控时间”、定义什么时候做什么事,这就不是基本的时间窗口能够实现的了。
        这就需要我们使用——处理函数(ProcessFunction)了。处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”。而且处理函数继承了 AbstractRichFunction 抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函数还可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑;同时也是整个 DataStream API 的底层基础。
        处理函数的使用与基本的转换操作类似,只需要直接基于 DataStream 调用.process()方法就可以了。方法需要传入一个 ProcessFunction 作为参数,用来定义处理逻辑(我们之前通过给 process 方法传入一个实现了 ProcessFunction 抽象类的匿名内部类来实现侧输出流、通过给 process 方法传入一个实现了 CoProcessFunction 抽象类的匿名内部类来实现合流 )。

stream.process(new MyProcessFunction());

        这里 ProcessFunction 不是接口,而是一个抽象类,继承了 AbstractRichFunction;MyProcessFunction 是它的一个具体实现。所以所有的处理函数,都是富函数(RichFunction),富函数可以调用的东西这里同样都可以调用。

1.1.2、ProcessFunction 解析

        在源码中我们可以看到,抽象类 ProcessFunction 继承了 AbstractRichFunction,有两个泛型类型参数:I 表示 Input,也就是输入的数据类型;O 表示 Output,也就是处理完成之后输出的数据类型。
        内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()。

package org.apache.flink.streaming.api.functions;import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;@PublicEvolving
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {// ...// 核心处理逻辑public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;// 定时器public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}// ...
}
1. 抽象方法.processElement()

         用于“处理元素”,定义了处理的核心逻辑。这个方法对于流中的每个元素都会调用一次,参数包括三个:输入数据值 value,上下文 ctx,以及“收集器”(Collector)out。方法没有返回值,处理之后的输出数据是通过收集器 out 来定义的。

  • value:当前流中的输入元素,也就是正在处理的数据,类型与流中数据类型一致。
  • ctx:类型是 ProcessFunction 中定义的内部抽象类 Context,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”(TimerService),以及可以将数据发送到“侧输出流”(side output)的方法.output()。

Context 抽象类定义如下:

    public abstract class Context {public abstract Long timestamp();public abstract TimerService timerService();public abstract <X> void output(OutputTag<X> outputTag, X value);}

  • out:“收集器”(类型为 Collector),用于返回输出数据。使用方式与 flatMap算子中的收集器完全一样,直接调用 out.collect()方法就可以向下游发出一个数据。这个方法可以多次调用,也可以不调用。

通过几个参数的分析不难发现,ProcessFunction 可以轻松实现 flatMap 这样的基本转换功能(当然 map、filter 更不在话下);而通过富函数提供的获取上下文方法.getRuntimeContext(),也可以自定义状态(state)进行处理,这也就能实现聚合操作的功能了。自定义状态的具体实现,我们会在后面学到 “状态管理” 的时候再说。


2. 非抽象方法.onTimer()

        用于定义定时触发的操作,这是一个非常强大、也非常有趣的功能。这个方法只有在注册好的定时器触发的时候才会调用,而定时器是通过“定时服务”TimerService 来注册的。

        打个比方,注册定时器(timer)就是设了一个闹钟,到了设定时间就会响;而.onTimer()中定义的,就是闹钟响的时候要做的事。所以它本质上是一个基于时间的“回调”(callback)方法,通过时间的进展来触发;在事件时间语义下就是由水位线(watermark)来触发了。        

        与.processElement()类似,定时方法.onTimer()也有三个参数:时间戳(timestamp),上下文(ctx),以及收集器(out)。这里的 timestamp 是指设定好的触发时间,事件时间语义下当然就是水位线了。另外这里同样有上下文和收集器,所以也可以调用定时服务(TimerService),以及任意输出处理之后的数据。

        既然有.onTimer()方法做定时触发,我们用 ProcessFunction 也可以自定义数据按照时间分组、定时触发计算输出结果;这其实就实现了窗口(window)的功能。所以说 ProcessFunction是真正意义上的终极奥义,用它可以实现一切功能。我们也可以看到,处理函数都是基于事件触发的。水位线就如同插入流中的一条数据一样;只不过处理真正的数据事件调用是.processElement()方法,而处理水位线事件调用的是.onTimer()。

        这里需要注意的是,上面的.onTimer()方法只是定时器触发时的操作,而定时器(timer)真正的设置需要用到上下文 ctx 中的定时服务。在 Flink 中,只有“按键分区流”KeyedStream才支持设置定时器的操作,所以之前的代码中我们并没有使用定时器。所以基于不同类型的流,可以使用不同的处理函数,它们之间还是有一些微小的区别的。接下来我们就介绍一下处理函数的分类。

1.1.3、处理函数的分类

1. 处理函数的功能和使用

        Flink 中的处理函数其实是一个大家族,ProcessFunction 只是其中一员。我们知道,DataStream 在调用一些转换方法之后,有可能生成新的流类型;例如调用 .keyBy() 之后得到 KeyedStream,进而再调用.window()之后得到 WindowedStream。对于不同类型的流,其实都可以直接调用.process()方法进行自定义处理,这时传入的参数就都叫作处理函数。当然,它们尽管本质相同,都是可以访问状态和时间信息的底层 API,可彼此之间也会有所差异。Flink 提供了 8 个不同的处理函数:

  1. ProcessFunction。最基本的处理函数,基于 DataStream 直接调用.process()时作为参数传入。
  2. KeyedProcessFunction。对流按键分区后的处理函数,基于 KeyedStream 调用.process()时作为参数传入。要想使用定时器,比如基于 KeyedStream。
  3. ProcessWindowFunction。开窗之后的处理函数,也是全窗口函数的代表。基于 WindowedStream 调用.process() 时作为参数传入。
  4. ProcessAllWindowFunction。同样是开窗之后的处理函数(没有 keyby 的话传这个),基于 AllWindowedStream 调用.process()时作为参数传入。
  5. CoProcessFunction。合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用.process()时作为参数传入。关于流的连接合并操作,我们会在后续章节详细介绍。
  6. ProcessJoinFunction。间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用.process()时作为参数传入。
  7. BroadcastProcessFunction。广播连接流处理函数,基于 BroadcastConnectedStream 调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广播流(BroadcastStream)做连接(conncet)之后的产物。
  8. KeyedBroadcastProcessFunction。按键分区的广播连接流处理函数,同样是基于 BroadcastConnectedStream 调用.process()时作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流,是一个 KeyedStream与广播流(BroadcastStream)做连接之后的产物。

接下来,我们就对经常用到的 KeyedProcessFunction 和 ProcessWindowFunction 的具体用法展开详细说明。

2.1、按键分区处理函数(KeyedProcessFunction)

        在 Flink 程序中,为了实现数据的聚合统计,或者开窗计算之类的功能,我们一般都要先用 keyBy 算子对数据流进行“按键分区”,得到一个 KeyedStream。也就是指定一个键(key),按照它的哈希值(hash code)将数据分成不同的“组”,然后分配到不同的并行子任务上执行计算;这相当于做了一个逻辑分流的操作,从而可以充分利用并行计算的优势实时处理海量数据。
        另外我们在上节中也提到,只有在 KeyedStream 中才支持使用 TimerService 设置定时器的操作。所以一般情况下,我们都是先做了 keyBy 分区之后,再去定义处理操作;代码中更加常见的处理函数是 KeyedProcessFunction,最基本的 ProcessFunction 反而使用率没那么高。接下来我们就先从定时服务(TimerService)入手,详细讲解 KeyedProcessFunction 的用法

2.1.1、定时器(Timer)和定时服务(TimerService)

        KeyedProcessFunction 的一个特色,就是可以灵活地使用定时器。定时器(timers)是处理函数中进行时间相关操作的主要机制。在.onTimer()方法中可以实现定时处理的逻辑,而它能触发的前提,就是之前曾经注册过定时器、并且现在已经到了触发时间。注册定时器的功能,是通过上下文中提供的“定时服务”(TimerService)来实现的。
        定时服务与当前运行的环境有关。前面已经介绍过,ProcessFunction 的上下文(Context)中提供了.timerService()方法,可以直接返回一个 TimerService 对象。TimerService 是 Flink 关于时间和定时器的基础服务接口,包含以下六个方法:

public abstract TimerService timerService();
TimerService 是 Flink 关于时间和定时器的基础服务接口,包含以下六个方法:
// 获取当前的处理时间
long currentProcessingTime();
// 获取当前的水位线(事件时间)
long currentWatermark();
// 注册处理时间定时器,当处理时间超过 time 时触发
void registerProcessingTimeTimer(long time);
// 注册事件时间定时器,当水位线超过 time 时触发
void registerEventTimeTimer(long time);
// 删除触发时间为 time 的处理时间定时器
void deleteProcessingTimeTimer(long time);
// 删除触发时间为 time 的处理时间定时器
void deleteEventTimeTimer(long time);

        六个方法可以分成两大类:基于处理时间和基于事件时间。而对应的操作主要有三个:获取当前时间,注册定时器,以及删除定时器。

        需要注意:尽管处理函数中都可以直接访问TimerService,不过只有基于 KeyedStream 的处理函数,才能去调用注册和删除定时器的方法;未作按键分区的 DataStream 不支持定时器操作,只能获取当前时间

        对于处理时间和事件时间这两种类型的定时器,TimerService 内部会用一个优先队列将它们的时间戳(timestamp)保存起来,排队等待执行。可以认为,定时器其实是 KeyedStream上处理算子的一个状态,它以时间戳作为区分。所以 TimerService 会以键(key)和时间戳为标准,对定时器进行去重;也就是说对于每个 key 和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次。这样一来,我们在代码中就方便了很多,可以肆无忌惮地对一个 key 注册定时器,而不用担心重复定义——因为一个时间戳上的定时器只会触发一次。

        基于 KeyedStream 注册定时器时,会传入一个定时器触发的时间戳,这个时间戳的定时器对于每个 key 都是有效的。这样,我们的代码并不需要做额外的处理,底层就可以直接对不同key 进行独立的处理操作了。
        利用这个特性,有时我们可以故意降低时间戳的精度,来减少定时器的数量,从而提高处理性能。比如我们可以在设置定时器时只保留整秒数,那么定时器的触发频率就是最多 1 秒一次。

1. 事件时间定时器

我们通过 Socket 接收一个无序的数据流(WaterSensor类型),并指定允许迟到 3s,然后进行一个 keyBy ,之后得到 KeyedStream,我们给它定义一个定时器(当事件时间进展到 5s 的时候触发一次):

public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("localhost", 9999).map(new WaterSensorFunction())// todo 指定 watermark 策略,我们直接使用实现好的.assignTimestampsAndWatermarks(WatermarkStrategy// 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))   // 等待3s// 指定如何从数据中提取事件时间.withTimestampAssigner((WaterSensor sensor, long recordTimestamp)-> {
//                                System.out.println("数据=" + sensor + ",recordTs=" + recordTimestamp);return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms}));KeyedStream<WaterSensor, String> sensorKs = sensorDS.keyBy(WaterSensor::getId);// todo Process:keyedSingleOutputStreamOperator<String> process = sensorKs.process(/*** KeyedProcessFunction<K, T, R>* K: key 的类型* T: data 的类型* R: return 的类型*/new KeyedProcessFunction<String, WaterSensor, String>() {/*** 来一条数据调用一次这个方法* @param value 数据* @param ctx 上下文* @param out 采集器* @throws Exception*/@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {// 从数据中提取出来的时间,如果没有则返回 nullLong currentEventTime = ctx.timestamp();// 定时器TimerService timerService = ctx.timerService();// 注册定时器 - 事件时间timerService.registerEventTimeTimer(5000L); // 事件时间进展到 5s 时触发闹钟(定时器)System.out.println("当前的 key="+ctx.getCurrentKey()+"当前时间为 " + currentEventTime + ",注册了一个5s的定时器");// 注册定时器 - 处理时间
//                        timerService.registerProcessingTimeTimer();// 删除定时器 - 事件时间
//                        timerService.deleteEventTimeTimer();// 删除定时器 - 处理时间
//                        timerService.deleteProcessingTimeTimer();// 获取当前的处理时间 - 系统时间long currentPs = timerService.currentProcessingTime();// 获取当前水位线long watermark = timerService.currentWatermark();}/*** 定义定时器(闹钟)触发时的响应逻辑,对于同一个key,onTimer只会被触发一次!!* @param timestamp 当前时间进展* @param ctx 上下文* @param out 采集器* @throws Exception*/@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);System.out.println("当前的 key= "+ctx.getCurrentKey()+"现在时间为 " + timestamp + "定时器触发");}});process.print();env.execute();}
}

输入数据:

s1,1,1
s1,3,3
s1,5,5
s1,8,8
s1,9,9

输出数据:

当前key=s1,当前时间为 1000,注册了一个5s的定时器
当前key=s1,当前时间为 3000,注册了一个5s的定时器
当前key=s1,当前时间为 5000,注册了一个5s的定时器
当前key=s1,当前时间为 8000,注册了一个5s的定时器
当前key=s1,当前时间为 9000,注册了一个5s的定时器
当前的 key= s1现在时间为 5000定时器触发

        首先可以看到,每来一条数据都会注册一个定时器;我们还可以发现,当数据进展到 8s 的时候,按说我们设置的最多等待 3s ,而这里 8-3=5 按说应该达到触发条件了,可是却没有触发。其实对于触发器来说,它这个时候的时间其实是 (8s-3s-1ms=4999ms),其实离触发还差 1ms,所以当我们数据的事件时间为 9s 的时候,就会发现定时器终于被触发了。

注意:事件事件语义下,对于同一个 key 定时器只触发一次!!!对于相同 key 的数据,Flink 会根据 key 对定时器进行去重。

2. 处理时间定时器

和上面一样,既然是处理时间的话,我们数据中带的事件时间就没用了,这里我们给每个来的数据定义一个五秒后的定时器:

public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("localhost", 9999).map(new WaterSensorFunction())// todo 指定 watermark 策略,我们直接使用实现好的.assignTimestampsAndWatermarks(WatermarkStrategy// 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))   // 等待3s// 指定如何从数据中提取事件时间.withTimestampAssigner((WaterSensor sensor, long recordTimestamp)-> {
//                                System.out.println("数据=" + sensor + ",recordTs=" + recordTimestamp);return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms}));KeyedStream<WaterSensor, String> sensorKs = sensorDS.keyBy(WaterSensor::getId);// todo Process:keyedSingleOutputStreamOperator<String> process = sensorKs.process(/*** KeyedProcessFunction<K, T, R>* K: key 的类型* T: data 的类型* R: return 的类型*/new KeyedProcessFunction<String, WaterSensor, String>() {/*** 来一条数据调用一次这个方法* @param value 数据* @param ctx 上下文* @param out 采集器* @throws Exception*/@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {// todo 1.获取定时器并注册TimerService timerService = ctx.timerService();// todo 1.1 注册事件时间定时器// 事件时间 也就是当前数据中的 watermark,如果没有则返回 null
//                        Long currentEventTime = ctx.timestamp();// 注册定时器 - 事件时间
//                        timerService.registerEventTimeTimer(5000L); // 事件时间进展到 5s 时触发闹钟(定时器)
//                        System.out.println("当前key="+ctx.getCurrentKey()+",当前时间为 " + currentEventTime + ",注册了一个5s的定时器");// todo 1.2 注册处理时间定时器// 处理时间 也就是当前的处理时间 - 系统时间long currentPs = timerService.currentProcessingTime();// 注册定时器 - 处理时间timerService.registerProcessingTimeTimer(currentPs+5000L);  // 当处理时间为 当前时间+5s 触发闹钟System.out.println("当前key="+ctx.getCurrentKey()+",当前时间为 " + currentPs + ",注册了一个5s后的定时器");// 删除定时器 - 事件时间
//                        timerService.deleteEventTimeTimer();// 删除定时器 - 处理时间
//                        timerService.deleteProcessingTimeTimer();// 获取当前水位线long watermark = timerService.currentWatermark();}// todo 2.定义触发定时器逻辑/*** 定义定时器(闹钟)触发时的响应逻辑,对于同一个key,onTimer只会被触发一次!!* @param timestamp 当前时间进展* @param ctx 上下文* @param out 采集器* @throws Exception*/@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);System.out.println("当前的 key= "+ctx.getCurrentKey()+"现在时间为 " + timestamp + "定时器触发");}});process.print();env.execute();}
}

测试输入:

s1,1,1
s1,2,2
s1,3,3

输出:

当前key=s1,当前时间为 1703043149860,注册了一个5s后的定时器
当前key=s1,当前时间为 1703043151317,注册了一个5s后的定时器
当前key=s1,当前时间为 1703043152807,注册了一个5s后的定时器
当前的 key= s1现在时间为 1703043154860定时器触发
当前的 key= s1现在时间为 1703043156317定时器触发
当前的 key= s1现在时间为 1703043157807定时器触发

可以看到,处理时间语义下,对于同一个 key 它有可能会触发多次。

3. watermark 的滞后性
 @Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {// todo 1.获取定时器并注册TimerService timerService = ctx.timerService();// 获取当前水位线long watermark = timerService.currentWatermark();System.out.println("当前数据="+value+",当前watermark="+watermark);
}

输入: 

s1,1,1
s1,5,5
s1,9,9

输出:

当前数据=WaterSensor{id='s1', ts=1, vc=1},当前watermark=-9223372036854775808
当前数据=WaterSensor{id='s1', ts=5, vc=5},当前watermark=-2001
当前数据=WaterSensor{id='s1', ts=9, vc=9},当前watermark=1999

可以看到,当我们的数据 {s1,1,1} 到达后,watermark 并不是 1-3-1ms = -2001 而是 watermark 的初始值 Inerger.MIN_VALUE,这是因为我们的水位线总是插入到数据后面的,而 processElement 方法一次只能处理一个数据,所以当数据  {s1,1,1}  处理完毕之后 watermark=-2001 才会进入 processElement 方法并更新 watermark。

定时器 - 总结
  1. 只有 KeyedStream 才有定时器
  2. 事件时间定时器,通过数据的 watermark 来触发
    1. 注意:watermark = 当前最大事件时间 - 最大等待时间 -1ms
  3. 在 processElement 中获取到的 watermark 是上一次的 watermark ,因为 watermark 是在数据后面进入 processElement 方法的。

2.3、窗口处理函数

        除了 KeyedProcessFunction , 另外一大类常用的处 理 函 数 ,就是基于窗口的ProcessWindowFunction 和 ProcessAllWindowFunction 了。

2.3.1、窗口处理函数的使用

        进行窗口计算,我们可以直接调用现成的简单聚合方法(sum/max/min),也可以通过用.reduce()或.aggregate()来自定义一般的增量聚合函数(ReduceFunction/AggregateFucntion);而对于更加复杂、需要窗口信息和额外状态的一些场景,我们还可以直接使用全窗口函数(apply/process)、把数据全部收集保存在窗口内,等到触发窗口计算时再统一处理。窗口处理函数就是一种典型的全窗口函数。

        窗口处理函数 ProcessWindowFunction 的使用与其他窗口函数类似 ,也是基于WindowedStream 直接调用方法就可以,只不过这时调用的是 .process()。就像我们之前窗口那一章节写的全窗口函数:

public class WindowProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("localhost", 9999).map(new WaterSensorFunction());KeyedStream<WaterSensor, String> sensorKs = sensorDS.keyBy(WaterSensor::getId);// todo 1. 指定窗口分配器:基于处理时间的滚动窗口WindowedStream<WaterSensor, String, TimeWindow> tumblingWindow = sensorKs.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));// todo 2. 指定窗口函数:全窗口函数SingleOutputStreamOperator<String> process = tumblingWindow.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {/**** @param key 分组的 key* @param context 上下文* @param elements 全窗口存的数据* @param out 采集器* @throws Exception*/@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String start = sdf.format(new Date(startTs));String end = sdf.format(new Date(endTs));long size = elements.spliterator().estimateSize();out.collect("key=" + key + " 的窗口[" + start + "," + end + "]包含" + size + "条数据===>" + elements.toString());}});process.print();env.execute();}
}

2.3.2、ProcessWindowFnction 解析

ProcessWindowFunction 既是处理函数又是全窗口函数。从名字上也可以推测出,它的本质似乎更倾向于“窗口函数”一些。事实上它的用法也确实跟其他处理函数有很大不同。我们可以从源码中的定义看到这一点:

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {public abstract void process( KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;public void clear(Context context) throws Exception {}public abstract class Context implements java.io.Serializable {...}
}

ProcessWindowFunction 依然是一个继承了 AbstractRichFunction 的抽象类,它有四个类型参数:

  • IN:input,数据流中窗口任务的输入数据类型。
  • OUT:output,窗口任务进行计算之后的输出数据类型。
  • KEY:数据中键 key 的类型。
  • W:窗口的类型,是 Window 的子类型。一般情况下我们定义时间窗口,W就是 TimeWindow。而内部定义的方法,跟我们之前熟悉的处理函数就有所区别了。

因为全窗口函数不是逐个处理元素的,所以处理数据的方法在这里并不是.processElement(),而是改了.process()。方法包含四个参数。

  • key:窗口做统计计算基于的键,也就是之前 keyBy 用来分区的字段。
  • context:当前窗口进行计算的上下文,它的类型就是 ProcessWindowFunction内部定义的抽象类 Context。
  • elements:窗口收集到用来计算的所有数据,这是一个可迭代的集合类型。
  • out:用来发送数据输出计算结果的收集器,类型为 Collector。

可以明显看出,这里的参数不再是一个输入数据,而是窗口中所有数据的集合(一个迭代器对象)。而上下文context 所包含的内容也跟其他处理函数有所差别:

public abstract class Context implements java.io.Serializable {public abstract W window();public abstract long currentProcessingTime();public abstract long currentWatermark();public abstract KeyedStateStore windowState();public abstract KeyedStateStore globalState();public abstract <X> void output(OutputTag<X> outputTag, X value);
}

除了可以通过.output()方法定义侧输出流不变外,其他部分都有所变化:

  • 这里不再持有TimerService 对象,只能通过 currentProcessingTime()和 currentWatermark()来获取当前时间,所以失去了设置定时器的功能;
  • 另外由于当前不是只处理一个数据,所以也不再提供.timestamp()方法。

与此同时,也增加了一些获取其他信息的方法:

  • 可以通过.window()直接获取到当前的窗口对象,
  • 也可以通过.windowState()和.globalState()获取到当前自定义的窗口状态和全局状态。

注意:这里的“窗口状态”是自定义的,不包括窗口本身已经有的状态,针对当前 key、当前窗口有效;而“全局状态”同样是自定义的状态,针对当前 key 的所有窗口有效。所以我们会发现,ProcessWindowFunction 中除了.process()方法外,并没有.onTimer()方法,而是多出了一个.clear()方法。从名字就可以看出,这主要是方便我们进行窗口的清理工作。如果我们自定义了窗口状态,那么必须在.clear()方法中进行显式地清除,避免内存溢出。

这里有一个问题:没有了定时器,那窗口处理函数就失去了一个最给力的武器,如果我们希望有一些定时操作又该怎么做呢?其实仔细思考会发现,对于窗口而言,它本身的定义就包含了一个触发计算的时间点,其实一般情况下是没有必要再去做定时操作的。如果非要这么干,Flink也提供了另外的途径——使用窗口触发器(Trigger)。在触发器中也有一个TriggerContext,它可以起到类似 TimerService 的作用:获取当前时间、注册和删除定时器,另外还可以获取当前的状态。这样设计无疑会让处理流程更加清晰——定时操作也是一种“触发”,所以我们就让所有的触发操作归触发器管,而所有处理数据的操作则归窗口函数管。

至于另一种窗口处理函数 ProcessAllWindowFunction,它的用法非常类似。区别在于它基于的是 AllWindowedStream,相当于对没有 keyBy 的数据流直接开窗并调用.process()方法,但如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。所以在实际应用中一般不推荐使用这种方式。在代码中,直接基于 DataStream 调用.windowAll()定义窗口:

stream.windowAll( TumblingEventTimeWindows.of(Time.seconds(10)) ).process(new MyProcessAllWindowFunction());

 2.4、应用案例 - Top N

        案例需求:实时统计一段时间内出现次数最多的水位。例如:统计最近10s内出现最多的两个水位,并且每5s更新一次。我们知道,这可以通过一个滑动窗口来实现,于是就需要开滑动窗口收集传感器的数据,按照不同的水位进行统计,而后汇总排序并最终输出前两名。其实这就是著名的“Top N”问题。

2.4.1、使用 ProcessAllWindowFunction

我们的数据类型 WaterSenor 的三个属性(id:传感器id,ts:事件时间,vc:水位高度)

/*** 案例: 不同水位出现的次数的 topN*/
public class TopNDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("localhost", 9999).map(new WaterSensorFunction())// todo 指定 watermark 策略,我们直接使用实现好的.assignTimestampsAndWatermarks(WatermarkStrategy// 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))   // 等待3s// 指定如何从数据中提取事件时间.withTimestampAssigner((WaterSensor sensor, long recordTimestamp)-> {
//                                System.out.println("数据=" + sensor + ",recordTs=" + recordTimestamp);return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms}));// todo 思路1: 不做 keyBy 直接使用一个 hashMap<vc,count> 来累加 统一vc的count值// 窗口大小:10s,步长:5ssensorDS.windowAll(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5))).process(new ProcessAllWindowFunction<WaterSensor, String, TimeWindow>() {@Overridepublic void process(Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {// 定义一个hashMapMap<Integer,Integer> map = new HashMap<>();for (WaterSensor waterSensor : elements) {int vc = waterSensor.vc;map.put(vc,map.getOrDefault(vc,0)+1);}// 排序输出top2,利用 list 对map根据value进行排序List<Tuple2<Integer, Integer>> list = new ArrayList<>();for (Integer vc : map.keySet()) {list.add(Tuple2.of(vc,map.get(vc)));}list.sort(new Comparator<Tuple2<Integer, Integer>>() {@Overridepublic int compare(Tuple2<Integer, Integer> o1, Tuple2<Integer, Integer> o2) {return o2.f1-o1.f1;}});StringBuilder builder = new StringBuilder();builder.append("===================\n");// 防止越界,考虑list的size可能不够两个for (int i = 0; i < Math.min(list.size(),2); i++) {Tuple2<Integer, Integer> tuple = list.get(i);builder.append("top").append(i+1).append(": ");builder.append(tuple.f0).append(" -> ");builder.append(tuple.f1).append("\n");}builder.append("窗口结束时间=").append(DateFormatUtils.format(context.window().getEnd(), "yyyy-MM-dd HH:mm:ss.SSS"));builder.append("\n");out.collect(builder.toString());}}).print();env.execute();}}

注意:滑动窗口一定有第一个步长时被触发!到达第 1 个步长触发第 1 个窗口,到达第 2 个步长触发第 2 个窗口

上面我们定义了一个大小为 10 ,滑动步长为 5 的窗口,并且等待时间为 3

注意:

  1. 等待时间内来的数据如果不在窗口范围内并不会计入到窗口中!
  2. 窗口区间是左闭右开的!

所以,这里的窗口:

  • 第一个窗口:[-5,5)(注意:第一个窗口并不是[0,10)!!!)
  • 第一个窗口:[0,10)
  • ...

测试输入数据:

s1,1,1
s2,2,1
s3,3,2
s4,4,2
s5,5,2    // 窗口范围是左闭右开的,这里的 2 并不计数,这里达到第一个滑动步长,所以要等待3s
s6,6,1
s7,7,3
s8,8,3    // 此时才触发第一次计算
s10,10,2
s12,12,1
s13,13,4  // 达到第二个滑动步长,再次触发计算

输出结果:

===================
// [-5,5)的结果
top1: 1 -> 2
top2: 2 -> 2
窗口结束时间=1970-01-01 08:00:05.000===================
// [0,10)的结果
top1: 1 -> 3
top2: 2 -> 3
窗口结束时间=1970-01-01 08:00:10.000

2.4.2、使用 KeyedProcessFunction

        上面我们没有按键区分,直接将所有数据放在一个分区上进行了开窗操作。这相当于将并行度强行设置为 1,在实际应用中是要尽量避免的,因为如果数据量很大,一个并行度的情况下机器受不了,而且全窗口函数是在最后窗口要关闭(滚动窗口)或者移动(滑动窗口)时才对有窗口内的数据进行计算,所以计算压力可想而知;所以 Flink 官方也并不推荐使用 AllWindowedStream 进行处理。另外,我们在全窗口函数中定义了 HashMap来统计 水位 的出现次数,计算过程是要先收集齐所有数据、然后再逐一遍历更新 HashMap,这显然不够高效。如果我们可以利用增量聚合函数的特性,每来一条数据就更新一次该水位出现的次数,那么到窗口触发计算时只需要做排序输出就可以了。

        所以优化的思路就是,先按照 vc 对数据进行 keyBy 分区,然后开窗进行增量聚合。所以我们先用增量聚合函数 AggregateFunction 对每个 vc 的次数进行统计,然后结合 ProcessWindowFunction 排序输出最终结果。

总结:

  1. 我们首先根据数据的 vc 进行 keyBy 分区,开窗(根据需求开一个滑动窗口,窗口大小10s,滑动步长5s)
  2. 使用聚合函数(aggregate)结合全窗口函数(ProcessWindowFunction),先把每个区的数据(相同 vc)的次数统计出来得到 count,然后使用全窗口函数把返回值封装成 Tuple3 (vc,count,endWindow)的格式,因为我们要根据不同窗口范围进行统计 TopN
  3. 上面最终的结果是一个普通的 DataStream,我们需要再根据窗口范围(上面Tuple3 中的 endWindow字段)进行一个 keyBy 分区,把每个范围的数据放到一起进行统一排序(这里使用 hashMap(key=windowEnd,value=Tuple3<vc,count,windowEnd>) 进行存储,使用 arrayList 进行排序)。
  4. 使用定时器,当 processElement 数据到齐后进行触发计算输出。
/*** 案例: 不同水位出现的次数的 topN*/
public class KeyedProcessFunctionTopNDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("localhost", 9999).map(new WaterSensorFunction())// todo 指定 watermark 策略,我们直接使用实现好的.assignTimestampsAndWatermarks(WatermarkStrategy// 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))   // 等待3s// 指定如何从数据中提取事件时间.withTimestampAssigner((WaterSensor sensor, long recordTimestamp) -> {
//                                System.out.println("数据=" + sensor + ",recordTs=" + recordTimestamp);return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms}));// todo 思路2: 使用 keyedProcessFunction 实现KeyedStream<WaterSensor, Integer> keyedStream = sensorDS.keyBy(sensor -> sensor.vc);SingleOutputStreamOperator<Tuple3<Integer, Integer, Long>> windowAgg = keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))// AggregateFunction 3个泛型参数: 输入类型,累加器类型,输出类型.aggregate(new AggregateFunction<WaterSensor, Integer, Integer>() {// 累加器初始值@Overridepublic Integer createAccumulator() {return 0;}// 累加过程: 直接+1,毕竟我们都是相同key@Overridepublic Integer add(WaterSensor value, Integer accumulator) {return accumulator + 1;}// 累加结果直接返回@Overridepublic Integer getResult(Integer accumulator) {return accumulator;}@Overridepublic Integer merge(Integer a, Integer b) {return null;}},      // ProcessWindowFunction的4个泛型参数: 输入类型、输出类型、键类型、窗口类型// 这里由于我们后面要根据输出结果区分数据是来自哪个窗口的,所以使用了Tuple3<vc,count,windowEnd> 带上了结束窗口的标签new ProcessWindowFunction<Integer, Tuple3<Integer, Integer, Long>, Integer, TimeWindow>() {@Overridepublic void process(Integer key, Context context, Iterable<Integer> elements, Collector<Tuple3<Integer, Integer, Long>> out) throws Exception {// 迭代器只有一条数据 所以 iterator.next() 就是它的全部数据了Integer count = elements.iterator().next();long windowEnd = context.window().getEnd();out.collect(Tuple3.of(key, count, windowEnd));}});/*** windowAgg:SingOutputStreamOperator 的聚合结果:* vc=1,count=100,windowEnd=10s* vc=2,count=70,windowEnd=10s* vc=3,count=80,windowEnd=10s* 开窗聚合后,就变成了普通的数据流SingOutputStreamOperator(继承自 DataStream),所以我们自己给聚合结果打上了窗口结束的标签(windowEnd)*/// 2. 按照窗口结束标签进行 keyBy 保证同一窗口时间范围的数据统一处理,之后再排序windowAgg.keyBy(r -> r.f2).process(new TopN(2)).print();env.execute();}public static class TopN extends KeyedProcessFunction<Long,Tuple3<Integer,Integer,Long>,String>{private Map<Long,List<Tuple3<Integer,Integer,Long>>> map;private int threshold;public TopN(int threshold) {this.threshold = threshold;map = new HashMap<>();}// Tuple3<Integer, Integer, Long> value : Tuple3的元素类型: vc,count,windowEnd@Overridepublic void processElement(Tuple3<Integer, Integer, Long> value, Context ctx, Collector<String> out) throws Exception {// 进入这个方法的都只是一条数据,要排序就需要都到齐才行// 1. 存到 hashMapLong windowEnd = value.f2;if (map.containsKey(windowEnd)){List<Tuple3<Integer, Integer, Long>> list = map.get(windowEnd);list.add(value);}else {List<Tuple3<Integer, Integer, Long>> list = new ArrayList<>();list.add(value);map.put(windowEnd,list);}// 注册一个定时器,windowEnd+1ms 触发// 因为同一个窗口范围应该同时输出,只不过是一条一条调用processElement方法,1ms就够执行完了ctx.timerService().registerEventTimeTimer(windowEnd + 1);// 这里 out 不用操作}// 定时器触发逻辑@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);// 同一个窗口的计算结果攒齐了,需要开启排序和取 top N// 1. 排序Long windowEnd = ctx.getCurrentKey();List<Tuple3<Integer, Integer, Long>> list = map.get(windowEnd);list.sort((o1,o2) -> o2.f1-o1.f1);// 2. 取 topNStringBuilder builder = new StringBuilder();builder.append("===================\n");// 防止越界,考虑list的size可能不够两个for (int i = 0; i < Math.min(list.size(),threshold); i++) {Tuple3<Integer, Integer, Long> tuple = list.get(i);builder.append("top").append(i+1).append(": ");builder.append(tuple.f0).append(" -> ");builder.append(tuple.f1).append("\n");builder.append("窗口结束时间=").append(DateFormatUtils.format(tuple.f2, "yyyy-MM-dd HH:mm:ss.SSS"));builder.append("\n");builder.append("===================\n");}// list 用完就可以及时销毁了,节省空间list.clear();out.collect(builder.toString());}}
}

输入数据:

s1,1,1
s1,2,1
s1,5,2    
s1,8,3    
s1,9,1    // 第一个窗口范围 [-5,5),但是等待时间+3s所以8s进行输出,但是我们触发器+1ms所以9s才输出
s1,10,1
s1,13,2
s1,14,3    // 同理,第二个窗口范围 [0,10),14s才输出

输出结果:

===================
top1: 1 -> 2
窗口结束时间=1970-01-01 08:00:05.000
======================================
top1: 1 -> 3
窗口结束时间=1970-01-01 08:00:10.000
===================
top2: 2 -> 1
窗口结束时间=1970-01-01 08:00:10.000
===================

2.5、侧输出流(Side Output)

上下文对象 ctx 提供了侧输出流方法 output(OutTag,value) ,或者如果我们是 keyedStream.processElement() 的话,还可以在 .onTimer() 方法中调用上下文的.output()方法就可以了。我们之前用过好多次了。

/*** 案例: 不同水位出现的次数的 topN*/
public class SideOutputDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("localhost", 9999).map(new WaterSensorFunction())// todo 指定 watermark 策略,我们直接使用实现好的.assignTimestampsAndWatermarks(WatermarkStrategy// 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))   // 等待3s// 指定如何从数据中提取事件时间.withTimestampAssigner((WaterSensor sensor, long recordTimestamp) -> {
//                                System.out.println("数据=" + sensor + ",recordTs=" + recordTimestamp);return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms}));OutputTag<String> warnTag = new OutputTag<>("warn", Types.STRING);SingleOutputStreamOperator<WaterSensor> process = sensorDS.keyBy(sensor -> sensor.id).process(new KeyedProcessFunction<String, WaterSensor, WaterSensor>() {// KeyedProcessFunction泛型参数类型:key类型、输入类型、主流输出类型@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {// 使用侧输出流告警if (value.vc > 10) {ctx.output(warnTag, "当前水位=" + value.vc + ">阈值10!!!");}out.collect(value);}});process.print("main");process.getSideOutput(warnTag).print("warn");env.execute();}
}

输入数据:

s1,1,1
s1,2,2
s1,8,8
s1,10,10
s1,12,12

输出结果:

main> WaterSensor{id='s1', ts=1, vc=1}
main> WaterSensor{id='s1', ts=2, vc=2}
main> WaterSensor{id='s1', ts=8, vc=8}
main> WaterSensor{id='s1', ts=10, vc=10}
warn> 当前水位=12>阈值10!!!
main> WaterSensor{id='s1', ts=12, vc=12}

总结

        这一块知识点特别挺多,与前面的窗口知识关联紧密,都必须熟悉掌握,对感兴趣的事并不能算是一种痛苦,享受知识越来越丰富的过程吧。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/235341.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【ARM Trace32(劳特巴赫) 高级篇 21 -- Trace 系统性能分析 Performance Analyzer】

请阅读【Trace32 ARM 专栏导读】 文章目录 Performance AnalyzerPerf 操作步骤采样对象PC采样对象Memory采样对象 TaskPerformance Analyzer sample-based profiling 通常也叫做Trace32 的性能分析(Perf), 这个功能是通过周期性的采样来实现的。被采样到的数据可以被用于统计…

Apache Flink(十七):Flink On Standalone任务提交-Standalone Application模式

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频 目录

flink sql1.18.0连接SASL_PLAINTEXT认证的kafka3.3.1

阅读此文默认读者对docker、docker-compose有一定了解。 环境 docker-compose运行了一个jobmanager、一个taskmanager和一个sql-client。 如下&#xff1a; version: "2.2" services:jobmanager:image: flink:1.18.0-scala_2.12container_name: jobmanagerports:…

基于RocketMQ实现分布式事务

前言 在上一篇文章Spring Boot自动装配原理以及实践我们完成了服务通用日志监控组件的开发&#xff0c;确保每个服务都可以基于一个注解实现业务功能的监控。 而本文我们尝试基于RocketMQ实现下单的分布式的事务。可能会有读者会有疑问&#xff0c;之前我们不是基于Seata完成了…

AIGC:阿里开源大模型通义千问部署与实战

1 引言 通义千问-7B&#xff08;Qwen-7B&#xff09;是阿里云研发的通义千问大模型系列的70亿参数规模的模型。Qwen-7B是基于Transformer的大语言模型, 在超大规模的预训练数据上进行训练得到。预训练数据类型多样&#xff0c;覆盖广泛&#xff0c;包括大量网络文本、专业书籍…

百度侯震宇:AI原生与大模型将从三个层面重构云计算

12月20日&#xff0c;2023百度云智大会智算大会在北京举办&#xff0c;大会以「大模型重构云计算&#xff0c;Cloud for AI」为主题&#xff0c;深度聚焦大模型引发的云计算变革。 百度智能云表示&#xff0c;为满足大模型落地需求&#xff0c;正在基于「云智一体」战略重构…

ubuntu qt 源码编译

官方源码下载地址 : 源码地址 选择要下载的版本 dmg结尾的是MacOS系统里使用的Qt库&#xff0c;qt-everywhere-opensource-src-4.7.0是Qt源码包&#xff0c;有zip和tar.gz两个压缩格式的&#xff0c;两个内容是一样的&#xff0c;只是zip一般在Windows下比较流行&#xff0c;…

Java:语法速通

参考 菜鸟教程 java 继承 class 父类 { }class 子类 extends 父类 { }继承的特性&#xff1a; 子类拥有父类非private的属性和方法子类可以对父类进行扩展子类可以重写父类的方法使用extends只能单继承&#xff0c;使用implements可以变相的多继承&#xff0c;即一个类继承…

无人机支持的空中无蜂窝大规模MIMO系统中上行链路分布式检测

无人机支持的空中无蜂窝大规模MIMO系统中上行链路分布式检测 无人机支持的空中无蜂窝大规模MIMO系统中上行链路分布式检测介绍题目一. 背景&#xff08;解决的问题&#xff09;二. 系统模型2.1 信道模型2.1.1 信道系数2.1.2 进行标准化 2.2 信道估计 和 数据传输2.2.1 信道估计…

在Windows系统平台下部署运行服务端Idea工程的jar服务

前言 目前云原生docker等技术&#xff0c;加上部署流水线大大的简化了各种流程&#xff0c;我们后端开发的人员只需要提交代码后&#xff0c;构建、部署、测试、发布等环节都无需人员接入&#xff0c;完全的自动化交付了。那么你肯定不禁想问&#xff0c;如题的需求不是点击一…

Web 前端—HTML+CSS系列

HTML、CSS 一、HTMLCSS1.1什么是HTML、CSS1.2宇宙第一编辑器VS Code1.3Chrome浏览器1.4、深入了解网站开发 一、HTML基本操作1.web前端三大核心技术2.HTML初始代码3.HTML注释4.HTML语义化5.标题与段落6.文本修饰标签7.图片标签与图片属性8.引入文件的地址路径9.跳转链接10.跳转…

Leetcode—75.颜色分类【中等】

2023每日刷题&#xff08;六十五&#xff09; Leetcode—75.颜色分类 实现代码 class Solution { public:void sortColors(vector<int>& nums) {int red 0, white 0, blue 0;for(auto num: nums) {if(num 0) {red;} else if(num 1) {white;} else {blue;}}for…

机械、电气、自动化与人工智能融合:发展历程、问题与前景

导言 机械、电气、自动化行业与人工智能的结合&#xff0c;推动了工业革命的新浪潮。本文将深入研究这一融合的发展历程、遇到的问题、解决过程&#xff0c;以及未来的可用范围&#xff0c;着重分析在各国的应用现状和未来的研究趋势。同时&#xff0c;探讨在哪些方面能够取得胜…

环境搭建及源码运行_java环境搭建_idea版本下载及安装

1、介绍 Idea是一款被广泛使用的Java集成开发环境&#xff0c;它提供了丰富的功能和工具来帮助开发人员更高效地编写和调试代码。作为一款开源软件&#xff0c;Idea不仅提供了基本的代码编辑、自动完成和调试功能&#xff0c;还支持大量的插件和扩展&#xff0c;可为开发人员提…

MySQL基本操作 DDL DML DQL三大操作介绍

DDL 数据(结构)定义 创建表DML 数据操作 增删改DQL 查询语句 DDL 数据(结构)定义 创建表 创建 删除数据 注释 --空格内容 创建数据库 CREATE DATABASE [if not exists] 数据库名 [ CHARSET utf8]eg:CREATE DATABASE IF NOT EXISTS school CHARSET utf8如果对应school不存在,…

【NextJS】API请求执行两次的原因及解决方法

实验环境 next&#xff1a; 14.0.4react&#xff1a; ^18 实验代码 // file: app\page.tsx use client;export default function Home() {console.log(test)return (<></>) }原因 测试发现创建默认工程上面代码会输出两次test&#xff0c;其原因是为了模拟立即卸…

智慧安防视频监控EasyCVR如何通过回调接口向第三方平台推送RTSP视频通道离线通知

安防视频监控系统EasyCVR能在局域网、公网、专网等复杂的网络环境中部署&#xff0c;可支持4G、5G、WiFi、有线等方式进行视频的接入与传输、处理和分发。平台能将接入的视频流进行汇聚、转码、多格式输出和分发&#xff0c;具体包括&#xff1a;RTMP、RTSP、HTTP-FLV、WebSock…

文件传输软件SecureFX mac支持多种协议

SecureFX mac是一款文件传输客户端&#xff0c;可在 Mac 操作系统上使用。它由 VanDyke Software 公司开发&#xff0c;旨在为用户提供安全、可靠、高效的文件传输服务。 SecureFX 支持多种协议&#xff0c;包括 SFTP、SCP、FTP、FTP over SSL/TLS 和 HTTP/S。它使用强大的加密…

GEM5 Garent CPU cache消息传递路径:1. NI部分

简介 我们仔细分析下图怎么连的&#xff0c;以及消息传递路径。 图来自https://www.gem5.org/documentation/general_docs/ruby/ 代码的连接 fs.py->ruby.py-> gem5/configs/ruby/MESI_Two_Level.py 中的 create_system( options, full_system, system, dma_ports, b…

uniapp运行到手机模拟器

第一步&#xff0c;下载MUMU模拟器 下载地址&#xff1a;MuMu模拟器官网_安卓12模拟器_网易手游模拟器 (163.com) 第二步&#xff0c;运行mumu模拟器 第三步&#xff0c;运行mumu多开器 第三步&#xff0c;查看abs 端口 第四步&#xff0c;打开HBuilder,如下图&#xff0c;将…