flink重温笔记(八):Flink 高级 API 开发——flink 四大基石之 Window(涉及Time)

Flink学习笔记

前言:今天是学习 flink 的第八天啦!学习了 flink 高级 API 开发中四大基石之一: window(窗口)知识点,这一部分只要是解决数据窗口计算问题,其中时间窗口涉及时间,计数窗口,会话窗口,以及 windowFunction 的各类 API,前前后后花费理解的时间还是比较多的,查阅了很多官方文档,我一定要好好掌握!

Tips:二月底了,春天来临之际我要再度突破自己,加油!

文章目录

  • Flink学习笔记
    • 三、Flink 高级 API 开发
      • 1. Window
        • 1.1 为什么需要 Window
        • 1.2 窗口应用代码结构
        • 1.3 窗口的类型和概念
        • 1.4 三种时间语义
        • 1.5 窗口的使用
          • 1.5.1 滚动窗口
          • 1.5.2 滑动窗口
          • 1.5.3 会话窗口
        • 1.6 窗口的范围
        • 1.7 Window API
          • 1.7.1 Window API 调用方法
        • 1.8 Time Window 案例
          • 1.8.1 滚动窗口(无重叠数据)
          • 1.8.2 滑动窗口(有重叠数据)
        • 1.9 Count Window 案例
          • 1.9.1 滚动窗口(无重叠数据)
          • 1.9.2 滑动窗口(有重叠数据)
        • 1.10 Session Window 案例
        • 1.11 Window Function
          • 1.11.1 增量聚合函数
            • (1)reduce 和 aggregate 的区别
            • (2) ReduceFunction
            • (3) AggregateFunction
          • 1.11.2 全量聚合函数
            • (1) apply 和 process 的区别
            • (2) ProcessWindowFunction / ProcessAllWindowFunction
            • (3) 自定义聚合 apply

三、Flink 高级 API 开发

Flink 的四大基石:Checkpoint、State、Time、Window

在这里插入图片描述

1. Window

1.1 为什么需要 Window

例如:流数据处理中,在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口(window),用来收集最近1分钟内的数据,并对这个窗口内的数据进行计算。

1.2 窗口应用代码结构
  • Keyed Window
// Keyed Window
stream.keyBy(...)              <-  按照一个Key进行分组.window(...)            <-  将数据流中的元素分配到相应的窗口中[.trigger(...)]            <-  指定触发器Trigger(可选)[.evictor(...)]            <-  指定清除器Evictor(可选).reduce/aggregate/process()      <-  窗口处理函数Window Function
  • Non-Keyed Window
// Non-Keyed Window
stream.windowAll(...)         <-  不分组,将数据流中的所有元素分配到相应的窗口中[.trigger(...)]            <-  指定触发器Trigger(可选)[.evictor(...)]            <-  指定清除器Evictor(可选).reduce/aggregate/process()      <-  窗口处理函数Window Function

Tips:windowAll 不对数据流进行分组,所有数据将发送到下游算子单个实例上,下游并行度是 1。

1.3 窗口的类型和概念

类型:

  • CountWindow:按照指定的数据条数生成一个Window,与时间无关。
    • 滚动计数窗口,每隔N条数据,统计前N条数据
    • 滑动计数窗口,每隔N条数据,统计前M条数据
  • TimeWindow:按照时间生成Window。
    • 滚动时间窗口,每隔N时间,统计前N时间范围内的数据,窗口长度N,滑动距离N
    • 滑动时间窗口,每隔N时间,统计前M时间范围内的数据,窗口长度M,滑动距离N
    • 会话窗口,按照会话划定的窗口

概念:

  • 滚动窗口 — TumblingWindow:

    例子:假设有个红绿灯,提出个问题:*计算一下通过这个路口的汽车数量*

    细分:*按照窗口划分依据分为:滚动时间窗口、滚动计数窗口*

  • 滑动窗口 — SlidingWindow

    解释:每隔1分钟,统计前面2分钟内通过的车辆数

  • 滚动和滑动区别:

    滑动距离 > 窗口长度, 漏掉数据,如:每隔5分钟,统计前1分钟的数据(滑动距离5分钟,窗口长度1分钟,漏掉4分钟数据)

    滑动距离 < 窗口长度, 重复处理数据,如:每隔1分钟,统计前5分钟数据(滑动距离1分钟,窗口长度5分钟,重复处理4分钟数据)

    滑动距离 = 窗口长度, 不漏也不会重复,也就是滚动窗口

1.4 三种时间语义
  • EventTime [事件时间]

    事件发生的时间,例如:点击网站上的某个链接的时间

    使用Event Time的优势是结果的可预测性,缺点是缓存较大,增加了延迟,且调试和定位问题更复杂。

  • IngestionTime [摄入时间]

    某个 Flink 节点的 source operator接收到数据的时间,例如:某个source消费到kafka中的数据

    Ingestion Time 程序无法处理任何无序事件或延迟数据,Flink 会自动分配时间戳和自动生成水位线。

  • ProcessingTime [处理时间]

    某个 Flink 节点执行某个 operation的时间,例如:timeWindow接收到数据的时间

    它提供了最好的性能和最低的延迟,但是无法精准的体现出数据在产生的那个时刻的变化情况

在这里插入图片描述

1.5 窗口的使用
1.5.1 滚动窗口

使用方法:

  • 滚动窗口下窗口之间之间不重叠,且窗口长度是固定的
  • 可以用 TumblingEventTimeWindowsTumblingProcessingTimeWindows 创建一个基于 Event TimeProcessing Time 的滚动时间窗口。
  • 窗口的长度可以用 org.apache.flink.streaming.api.windowing.time.Time 中的 secondsminuteshours 和 days 来设置。

例子:窗口的起止时间是[0:00:00.000 - 0:59:59.999),如果设置了 Offset,那么窗口的起止时间将变为[0:15:00.000 - 1:14:59.999)

DataStream<T> input = ...// 基于Event Time的滚动窗口
input
.keyBy(<KeySelector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<window function>(...)// 基于Processing Time的滚动窗口
input
.keyBy(<KeySelector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<window function>(...)// 在小时级滚动窗口上设置15分钟的Offset偏移
input
.keyBy(<KeySelector>)
.window(TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(15)))
.<window function>(...)

注意:时间窗口使用的是****timeWindow*()也可以使用*window****()

1.5.2 滑动窗口

使用方法:

  • 滑动窗口以一个步长(Slide)不断向前滑动,窗口的长度固定
  • slide < size,重复处理数据,slide > size,漏掉数据

例子:系统时间基于格林威治标准时间(UTC-0),中国的当地时间可以设置offset为Time.hours(-8)。

val input: DataStream[T] = ...// sliding event-time windows
input
.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<window function>(...)// sliding processing-time windows
input
.keyBy(<...>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<window function>(...)// sliding processing-time windows offset by -8 hours
input
.keyBy(<...>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<window function>(...)
1.5.3 会话窗口

使用方法:

  • 会话窗口根据 Session gap 切分不同的窗口,当一个窗口在大于 Session gap 的时间内没有接收到新数据时,窗口将关闭。

  • 窗口的长度是可变的,每个窗口的开始和结束时间并不是确定的

  • 可以设置定长的 Session gap,也可以使用 SessionWindowTimeGapExtractor 动态地确定Session gap的长度

例子:

  • 使用定长和可变的 Session gap 来建立会话窗口,
  • 其中 SessionWindowTimeGapExtractor[T] 的泛型 T 为数据流的类型,
  • 可以根据数据流中的元素来生成 Session gap 。
val input: DataStream[T] = ...// event-time session windows with static gap
input.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<window function>(...)// event-time session windows with dynamic gap
input.keyBy(...)
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[T] {
override def extract(element: T): Long = {// determine and return session gap
}
}))
.<window function>(...)// processing-time session windows with static gap
input.keyBy(...)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<window function>(...)// processing-time session windows with dynamic gap
input.keyBy(...)
.window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[T] {
override def extract(element: T): Long = {// determine and return session gap
}
}))
.<window function>(...)
1.6 窗口的范围
  • 1-窗口的判断是按照 毫秒 为单位

  • 2-窗口的开始: start,窗口的结束: start + 窗口长度 -1 毫秒

  • 3-开始时间 和 结束时间两者结合 决定了数据是属于哪个窗口的,数据的时间要满足:

    大于等于开始时间

    小于等于结束时间

  • 4-结束时间决定了窗口何时关闭和触发计算,规则是:

    数据的时间 满足 大于等于 结束时间 - 1毫秒

例子:

  • 比如窗口长度是5秒, 从0开始,那么窗口结束是: 0 + 5000 -1 = 4999
1.7 Window API
1.7.1 Window API 调用方法
  • 1- window 方法:仅针对keyby后的流可以使用,对分流后的每个子流加窗口

  • 2-windowAll 方法:使用了 keyby 分流后的流或者未使用 keyby 分流后的流,均可使用

    作用:对数据进行加窗口操作,并且会忽略是否进行了keyby分流

    区别:

    • 使用keyby分流后的流如果调用windowAll, 相当于未分流的效果
    • 未使用keyby分流后的数据,只能调用windowAll方法,无法调用window方法

1.8 Time Window 案例
1.8.1 滚动窗口(无重叠数据)

例子:自定义数据源,滚动时间窗口,5秒

package cn.itcast.day08.window;/*** @author lql* @time 2024-02-24 22:24:47* @description TODO*/import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import javax.xml.crypto.Data;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;/*** 滚动时间窗口* 案例:* 自定义一个Source, 每隔1秒产生一个的k,v  k是hadoop spark flink 其中某一个, v是随机数字* 对数据加窗口, 窗口1对未分流的数据统计数字总和* 窗口2对按key分组后的数据统计每个key对应的数字总和*/
public class TumblingTimeWindowDemo {public static void main(String[] args) throws Exception {// TODO 1) 初始化 flink 流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// TODO 2) 接入数据源DataStreamSource<Tuple2<String, Integer>> streamSource = env.addSource(new GenerateRandomNumSource());streamSource.printToErr("生产的数据>>>");// TODO 3) 对数据应用窗口操作//窗口1对未分流的数据统计数字总和SingleOutputStreamOperator<Tuple2<String, Integer>> sumOfAll = streamSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1);//窗口2对按key分组后的数据统计每个key对应的数字总和SingleOutputStreamOperator<Tuple2<String, Integer>> sumEashKey = streamSource.keyBy(t -> t.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1);sumOfAll.print("未分流数据的统计>>>");sumEashKey.print("分流后数据的统计>>>");//todo 4)运行任务env.execute();}/*** 自定义source* 自定义一个Source, 每隔1秒产生一个的k,v  k是hadoop spark flink 其中某一个, v是随机数字*/private static class GenerateRandomNumSource implements SourceFunction<Tuple2<String,Integer>> {private Boolean isRunning = true;private final Random random = new Random();private final List<String> keyList = Arrays.asList("hadoop","spark","flink");@Overridepublic void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {while(isRunning){String key = keyList.get(random.nextInt(3));ctx.collect(Tuple2.of(key,random.nextInt(100)));TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {isRunning = false;}}
}

结果:

生产的数据>>>:5> (spark,83)
生产的数据>>>:6> (spark,81)
生产的数据>>>:7> (flink,6)
生产的数据>>>:8> (hadoop,89)
生产的数据>>>:1> (spark,96)分流后数据的统计>>>:7> (flink,6)
分流后数据的统计>>>:8> (hadoop,89)
分流后数据的统计>>>:1> (spark,260)
未分流数据的统计>>>:4> (spark,355)生产的数据>>>:2> (hadoop,46)
生产的数据>>>:3> (flink,65)
生产的数据>>>:4> (hadoop,2)
生产的数据>>>:5> (hadoop,92)
生产的数据>>>:6> (hadoop,38)分流后数据的统计>>>:7> (flink,65)
分流后数据的统计>>>:8> (hadoop,178)
未分流数据的统计>>>:5> (hadoop,243)

总结:

  • 分流的数据 key 和 value 是分明的
  • 未分流的数据 key 取第一个,value 是总和
  • Time.seconds() 是取值时间,TimeUnit.SECONDS 是为了 sleep 方法
1.8.2 滑动窗口(有重叠数据)

例子:自定义数据源,滚动窗口,窗口10秒,滑动5秒

package cn.itcast.day08.window;/*** @author lql* @time 2024-02-24 23:24:08* @description TODO*/import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;/*** 滑动时间窗口案例* 自定义一个Source, 每隔1秒产生一个的k,v  k是hadoop spark flink 其中某一个, v是随机数字* 每隔5秒统计前10秒的数据, 分别统计*  1. 全量数字之和*  2. 分组后每个key对应的数字之和*/
public class SlidingTimeWindowDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//todo 2)接入数据源DataStreamSource<Tuple2<String, Integer>> streamSource = env.addSource(new GeneraterRandomNumSource());streamSource.printToErr("生成的数据>>>");//todo 3)对数据应用窗口操作//窗口1对未分流的数据统计数字总和SingleOutputStreamOperator<Tuple2<String, Integer>> sumOfAll = streamSource.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).sum(1);SingleOutputStreamOperator<Tuple2<String, Integer>> sumEashKey = streamSource.keyBy(t -> t.f0).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).sum(1);sumOfAll.print("未分流数据的统计>>>");sumEashKey.print("分流后数据的统计>>>");//todo 4)运行任务env.execute();}/*** 自定义source* 自定义一个Source, 每隔1秒产生一个的k,v  k是hadoop spark flink 其中某一个, v是随机数字*/private static class GeneraterRandomNumSource implements SourceFunction<Tuple2<String, Integer>> {private Boolean isRunning = true;private final List<String> keyList = Arrays.asList("hadoop","spark","flink");private final Random random = new Random();@Overridepublic void run(SourceContext<Tuple2<String, Integer>> sourceContext) throws Exception {while (isRunning) {String key = keyList.get(random.nextInt(3));sourceContext.collect(Tuple2.of(key, random.nextInt(100)));TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {isRunning = false;}}
}

结果:

=========第一批5秒数据============
生成的数据>>>:8> (hadoop,36)
生成的数据>>>:1> (spark,59)
生成的数据>>>:2> (hadoop,29)
生成的数据>>>:3> (spark,26)
生成的数据>>>:4> (spark,1)分流后数据的统计>>>:8> (hadoop,65)
分流后数据的统计>>>:1> (spark,86)
未分流数据的统计>>>:2> (hadoop,151)=========第二批5秒数据============
生成的数据>>>:5> (hadoop,37)
生成的数据>>>:6> (hadoop,75)
生成的数据>>>:7> (spark,69)
生成的数据>>>:8> (spark,11)
生成的数据>>>:1> (hadoop,4)分流后数据的统计>>>:8> (hadoop,181)
分流后数据的统计>>>:1> (spark,166)
未分流数据的统计>>>:3> (hadoop,347)=========第三批5秒数据============
生成的数据>>>:2> (flink,24)
生成的数据>>>:3> (spark,55)
生成的数据>>>:4> (flink,42)
生成的数据>>>:5> (flink,44)
生成的数据>>>:6> (hadoop,44)分流后数据的统计>>>:8> (hadoop,160)
分流后数据的统计>>>:1> (spark,135)
分流后数据的统计>>>:7> (flink,110)
未分流数据的统计>>>:4> (hadoop,405)

总结:

  • 第二次计算的是第一批5秒和第二批5秒的数据
  • 第三次计算的是第二批5秒和第三批5秒的数据
1.9 Count Window 案例
1.9.1 滚动窗口(无重叠数据)

例子:每隔5条统计数据, 分别统计,未分组(windowall)和分组(window)

package cn.itcast.day08.window;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;/*** @author lql* @time 2024-02-25 18:41:37* @description TODO*/
/*** 每隔5条统计数据, 分别统计* 1. 全量数字之和* 2. 分组后每个key对应的数字之和*/
public class TumblingCountWindowDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// sourceDataStreamSource<Tuple2<String, Integer>> streamSource = env.addSource(new GenerateRandomNumEverySecond());streamSource.printToErr("生成的数据>>>");//窗口1对未分流的数据统计数字总和SingleOutputStreamOperator<Tuple2<String, Integer>> sumOfAll = streamSource.countWindowAll(5).sum(1);//窗口2对分流的数据统计数字总和SingleOutputStreamOperator<Tuple2<String, Integer>> sumEashKey = streamSource.keyBy(t -> t.f0).countWindow(5).sum(1);sumOfAll.print("未分流数据的统计>>>");sumEashKey.print("分流后数据的统计>>>");//todo 4)运行任务env.execute();}private static class GenerateRandomNumEverySecond implements SourceFunction<Tuple2<String,Integer>> {private boolean isRunning = true;private final Random random = new Random();private final List<String> keyList = Arrays.asList("hadoop","spark","flink");@Overridepublic void run(SourceContext<Tuple2<String, Integer>> sourceContext) throws Exception {while(isRunning) {String key = keyList.get(random.nextInt(3));sourceContext.collect(Tuple2.of(key,random.nextInt(100)));TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {isRunning = false;}}
}

结果:

生成的数据>>>:3> (spark,76)
生成的数据>>>:4> (hadoop,30)
生成的数据>>>:5> (flink,85)
生成的数据>>>:6> (spark,83)
生成的数据>>>:7> (flink,37)未分流数据的统计>>>:6> (spark,311)生成的数据>>>:8> (flink,26)
生成的数据>>>:1> (hadoop,70)
生成的数据>>>:2> (flink,19)
生成的数据>>>:3> (hadoop,49)
生成的数据>>>:4> (flink,15)分流后数据的统计>>>:7> (flink,182)
未分流数据的统计>>>:7> (flink,179)

总结:

  • 未分流是将前面五条数据合并统计,key 是选择第一个,values 是5条数据的总和
  • 分流是将前面 key 相同的5条数据加起来,key 未凑到5个,就不触发计算
1.9.2 滑动窗口(有重叠数据)

例子:

package cn.itcast.day08.window;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;/*** @author lql* @time 2024-02-25 22:18:03* @description TODO*/
public class SlidingCountWindowDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// todo 1) sourceDataStreamSource<Tuple2<String, Integer>> streamSource = env.addSource(new GeneraterRandomNumSource());streamSource.printToErr("生成的数据>>>");// 第一个窗口 未分组SingleOutputStreamOperator<Tuple2<String, Integer>> sumOfAll = streamSource.countWindowAll(10, 5).sum(1);// 第二个窗口 分组SingleOutputStreamOperator<Tuple2<String, Integer>> sumEashKey = streamSource.keyBy(t -> t.f0).countWindow(10, 5).sum(1);sumOfAll.print("未分流数据的统计>>>");sumEashKey.print("分流后数据的统计>>>");//todo 4)运行任务env.execute();}private static class GeneraterRandomNumSource implements SourceFunction<Tuple2<String,Integer>> {private boolean isRunning = true;private final Random random = new Random();private final List<String> keysList = Arrays.asList("hadoop","spark","flink");@Overridepublic void run(SourceContext<Tuple2<String, Integer>> sourceContext) throws Exception {while(isRunning){String key = keysList.get(random.nextInt(3));sourceContext.collect(Tuple2.of(key,random.nextInt(100)));TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {isRunning = false;}}
}

结果:

生成的数据>>>:8> (flink,23)
生成的数据>>>:1> (flink,54)
生成的数据>>>:2> (hadoop,85)
生成的数据>>>:3> (hadoop,88)
生成的数据>>>:4> (spark,35)未分流数据的统计>>>:3> (flink,285)生成的数据>>>:5> (spark,15)
生成的数据>>>:6> (hadoop,57)
生成的数据>>>:7> (hadoop,20)
生成的数据>>>:8> (spark,55)
生成的数据>>>:1> (hadoop,87)分流后数据的统计>>>:8> (hadoop,337)
未分流数据的统计>>>:4> (flink,519)生成的数据>>>:2> (spark,38)
生成的数据>>>:3> (hadoop,1)
生成的数据>>>:4> (spark,88)分流后数据的统计>>>:1> (spark,231)生成的数据>>>:5> (hadoop,16)
生成的数据>>>:6> (hadoop,28)未分流数据的统计>>>:5> (spark,405)

总结:

  • 把它想象成长度为10的小框,每次移动五条数据,看看能框住多少
  • 未分流,就是简单框住多少个10条数据总和,除了第一次是框住5条数据
  • 分流,就是分类需要数量达到5才能计算
1.10 Session Window 案例

理解:会话窗口属于时间窗口,Session window的窗口大小,则是由数据本身决定

假设Session Window的时间gap如果是6秒,那么,上面的数据会被分成以下几个窗口key,10:00:00
key,10:00:03
key,10:00:05
==========05 与 12相差大于6秒,需要分割=======================key,10:00:12
key,10:00:15
==========15 与 24 相差大于6秒,需要分割======================key,10:00:24
==========24 与 30 相差等于6秒,需要分割======================key,10:00:30
==========30 与 42 相差大于6秒,需要分割======================key,10:00:42

也就是说,窗口之间划分的条件是时间差小于gap

例子:定义一个会话时间窗口, 5秒gap

package cn.itcast.day08.window;/*** @author lql* @time 2024-02-25 22:59:24* @description TODO*/import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;/*** 案例: 自定义一个Source, 每隔随机的秒(1~10)之间产生1条数据* 数据是key value, key: hadoop spark flink 其中一个, value: 是随机的数字* 需求1:定义一个会话时间窗口, 5秒gap, 统计全量数据之和* 需求2: 定义一个会话时间窗口, 5秒gap, 统计按照key分组后的每个组数据内的数字和*/
public class TimeSessionWindowDemo {private static final SimpleDateFormat sdf = new SimpleDateFormat("mm:ss.SSS");public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// sourceDataStreamSource<Tuple2<String, Integer>> streamSource = env.addSource(new GeneraterRandomNumSource());streamSource.printToErr("生成的数据>>>");// 窗口 1,不分组SingleOutputStreamOperator<Tuple2<String, Integer>> sumOfAll = streamSource.windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(5))).sum(1);// 窗口 2,分组SingleOutputStreamOperator<Tuple2<String, Integer>> sumEashKey = streamSource.keyBy(t -> t.f0).window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))).sum(1);sumOfAll.print("未分流数据的统计>>>");sumEashKey.print("分流后数据的统计>>>");//todo 4)运行任务env.execute();}private static class GeneraterRandomNumSource implements SourceFunction<Tuple2<String,Integer>> {private boolean isRunning = true;private final List<String> keysList = Arrays.asList("hadoop","spark","flink");private final Random random = new Random();@Overridepublic void run(SourceContext<Tuple2<String, Integer>> sourceContext) throws Exception {while(isRunning) {String key = keysList.get(random.nextInt(3));sourceContext.collect(Tuple2.of(key,random.nextInt(100)));// 为了避免生成器连续两次只休眠5秒// 但是,这个循环的逻辑有点问题,因为它会不断地尝试重新生成随机数,直到生成的不是5为止,这可能会导致长时间的等待。long sleepTime = 5L;while (sleepTime == 5L){sleepTime = random.nextInt(7);}System.out.println(sdf.format(new Date()) + ":sleep:"+sleepTime + "s");TimeUnit.SECONDS.sleep(sleepTime);}}@Overridepublic void cancel() {isRunning = false;}}
}

结果:

18:19.058:sleep:1s
生成的数据>>>:3> (spark,61)
18:20.063:sleep:2s
生成的数据>>>:4> (spark,97)
18:22.066:sleep:2s
生成的数据>>>:5> (spark,46)
18:24.070:sleep:1s
生成的数据>>>:6> (hadoop,55)
18:25.078:sleep:1s
生成的数据>>>:7> (flink,27)
18:26.085:sleep:4s
生成的数据>>>:8> (flink,9)分流后数据的统计>>>:1> (spark,204)
分流后数据的统计>>>:8> (hadoop,55)18:30.095:sleep:1s
生成的数据>>>:1> (flink,4)
18:31.099:sleep:6s
生成的数据>>>:2> (flink,61)分流后数据的统计>>>:7> (flink,101)
未分流数据的统计>>>:8> (spark,360)18:37.104:sleep:6s
生成的数据>>>:3> (hadoop,91)分流后数据的统计>>>:8> (hadoop,91)
未分流数据的统计>>>:1> (hadoop,91)

总结:

  • 打印结果位于生成数量之间,是因为计算是基于数据的时间差触发的
  • 分流的数据是,两个key相同的数据时间差触发计算
  • 未分流的数据是,两个数据时间差触发计算
  • 这里的时间是ProcessingTime [处理时间]
1.11 Window Function

窗口函数,即数据划分窗口后可以调用的处理函数。

1.11.1 增量聚合函数

指窗口每进入一条数据就计算一次

实现方法:(常见的增量聚合函数如下):

  • reduce(reduceFunction)
  • aggregate(aggregateFunction)
  • sum()
  • min()
  • max()
(1)reduce 和 aggregate 的区别

  • reduce 接受两个相同类型的输入,生成一个同类型输出,所以泛型就一个
  • maxBy、minBy、sum 这3个底层都是由 reduce 实现的
  • aggregate 的输入值、中间结果值、输出值它们3个类型可以各不相同,泛型有<T, ACC, R>
(2) ReduceFunction

例子:Flink使用ReduceFunction来对窗口中的元素进行增量聚合

package cn.itcast.day08.WindowFunction;/*** @author lql* @time 2024-02-26 18:40:13* @description TODO:测试 reduceFunction*/import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;/*** ReduceFunction定义了如何把两个输入的元素进行合并来生成相同类型的输出元素的过程,Flink使用ReduceFunction来对窗口中的元素进行增量聚合* 需求:不分组,划分窗口* 然后调用reduce对窗口内的数据进行聚合*/
public class CountWindowAllRedcueDemo {public static void main(String[] args) throws Exception {//todo 1)初始化flink流处理的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//todo 2)接受数据DataStreamSource<String> lines = env.socketTextStream("node1", 9999);//todo 3) 将数据转化为数字类型SingleOutputStreamOperator<Integer> nums = lines.map(Integer::parseInt);//todo 4)划分窗口//GlobalWindow有几个并行?一个并行,只有一个分区(窗口中只有一个subtask)AllWindowedStream<Integer, GlobalWindow> windowed = nums.countWindowAll(5);SingleOutputStreamOperator<Integer> result = windowed.reduce(new ReduceFunction<Integer>() {@Overridepublic Integer reduce(Integer value1, Integer value2) throws Exception {return value1 + value2; //增量聚合,不是满足条件再计算的,因此该方式效率更高,更节省资源}});//todo 6)打印输出result.print();//todo 7)递交作业env.execute();}
}

结果:

终端依次输入1,2,3,……10输出:
1> 15
2> 40

总结:

  • 增量聚合,运用 countWindowAll,每五个计算一次
  • GlobalWindow有几个并行?一个并行,只有一个分区(窗口中只有一个subtask)
  • 重写 reduce方法,增量就用 value1 + value2
  • 适用于直接来值,增量累加
(3) AggregateFunction

AggregateFunction 比 ReduceFunction 更加的通用,它有三个参数:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)

例子:测试AggFunction——求各个班级英语成绩平均分

package cn.itcast.day08.WindowFunction;import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author lql* @time 2024-02-26 18:52:50* @description TODO*/
public class TestAggFunctionOnWindow {public static void main(String[] args) throws Exception {//todo 1)初始化flink流处理的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//todo 2)接受数据DataStreamSource<Tuple3<String, String, Long>> inputSource = env.fromElements(ENGLISH);//todo 3)划分窗口//GlobalWindow有几个并行?一个并行,只有一个分区(窗口中只有一个subtask)SingleOutputStreamOperator<Double> result = inputSource.keyBy(t -> t.f0).countWindow(3).aggregate(new AvgAggFunction());//todo 4)打印输出result.print();//todo 5)递交作业env.execute();}public static final Tuple3[] ENGLISH = new Tuple3[] {Tuple3.of("class1", "张三", 100L),Tuple3.of("class1", "李四", 40L),Tuple3.of("class1", "王五", 60L),Tuple3.of("class2", "赵六", 20L),Tuple3.of("class2", "小七", 30L),Tuple3.of("class2", "小八", 50L),};//AggregateFunction 比 ReduceFunction 更加的通用,它有三个参数:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。//Tuple3<String, String, Long>:班级名称、学生名称、学生分数//Tuple2<Long, Long>:学生总分数、学生人数//Double:平均分数private static class AvgAggFunction implements AggregateFunction<Tuple3<String,String,Long>, Tuple2<Long,Long>,Double> {/*** 创建累加器保存中间状态(sum,count)* Long:总成绩* Long:学生个数* @return*/@Overridepublic Tuple2<Long, Long> createAccumulator() {return new Tuple2<>(0L,0L);}/*** 将元素追加到累加器并返回累加器* @param value 输入类型* @param acc 累加器acc类型* @return*/@Overridepublic Tuple2<Long, Long> add(Tuple3<String, String, Long> value, Tuple2<Long, Long> acc) {// acc(历史累加总成绩,学生个数)// value(班级,姓名,成绩)// 成绩累加的同时,同学人数加一return new Tuple2<>(acc.f0+value.f2,acc.f1+1);}/*** 从累加器提取数据* @param accumulator* @return*/@Overridepublic Double getResult(Tuple2<Long, Long> accumulator) {return (double) accumulator.f0 / accumulator.f1;}/*** 累加器合并* @param acc1* @param acc2* @return*/@Overridepublic Tuple2<Long, Long> merge(Tuple2<Long, Long> acc1, Tuple2<Long, Long> acc2) {return new Tuple2<>(acc1.f0 + acc2.f0, acc1.f1 + acc2.f1);}}
}

结果:

1> 33.333333333333336
3> 66.66666666666667

总结:

  • 因为 aggregate 是一个聚合类别,这里求平均值,所以 new 一个 AvgAggFunction
  • AvgAggFunction 实现 AggregateFunction 接口
  • AggregateFunction 比 ReduceFunction 更加的通用,它有三个参数:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。
  • 基于增量累加原理,重写 4 个方法:
    • createAccumulator:创建累加器保存中间状态(sum,count),返回值是初始化值
    • add:将元素追加到累加器并返回累加器
    • getResult:从累加器中提取值
    • merge:将累加器合并
1.11.2 全量聚合函数
  • 指在窗口触发的时候才会对窗口内的所有数据进行一次计算

  • 等窗口的数据到齐,才开始进行聚合计算,可实现对窗口内的数据进行排序等需求

实现方法

  • apply(windowFunction)
  • process(processWindowFunction)
(1) apply 和 process 的区别

  • apply 和 process 都是处理全量计算,但工作中正常用 process。
  • process更加底层,更加强大,有 open/close 生命周期方法,又可获取RuntimeContext。
(2) ProcessWindowFunction / ProcessAllWindowFunction
  • 全量聚合: 窗口需要维护全部原始数据,窗口触发进行全量聚合

  • ProcessWindowFunction 可以结合 ReduceFunction, AggregateFunction, 或者 FoldFunction 来做增量计算(推荐用法)

例子:

package cn.itcast.day08.WindowFunction;import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;/*** @author lql* @time 2024-02-27 15:52:54* @description TODO:演示processWindowFunction实现全量聚合*/
public class TestProcessWinFunctionOnWindow {public static void main(String[] args) throws Exception {// Todo 1): 获取流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Todo 2): 获取数据源DataStreamSource<Tuple3<String,String,Long>> inputSource = env.fromElements(ENGLISH);// Todo 3): 计算各班平均分inputSource.keyBy(t->t.f0).countWindow(2).process(new MyProcessWindowFunction()).print();// Todo 4): 启动程序env.execute();}public static final Tuple3[] ENGLISH = new Tuple3[] {Tuple3.of("class1", "张三", 100L),Tuple3.of("class1", "李四", 40L),Tuple3.of("class1", "王五", 60L),Tuple3.of("class2", "赵六", 20L),Tuple3.of("class2", "小七", 30L),Tuple3.of("class2", "小八", 50L),};/*** Tuple3<String, String, Long>:传入值类型* Double:返回值类型* String:分组字段类型* GlobalWindow:countWindow需要使用GlobalWindow, window使用TimeWindow*/private static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple3<String,String,Long>,Double,String, GlobalWindow> {//iterable:窗口内的所有元素的集合@Overridepublic void process(String s, Context context, Iterable<Tuple3<String, String, Long>> iterable, Collector<Double> collector) throws Exception {long sum = 0;long count = 0;for (Tuple3<String, String, Long> in : iterable) {sum += in.f2;count++;}collector.collect((double) sum / count );}}
}

结果:

3> 70.0
1> 25.0

总结:

  • countwindow(2) 这里的计数窗口,就是求和的时候,只求两个数据之和
  • 继承 ProcessWindowFunction,重写 process 方法
  • iterable:窗口内的所有元素的集合
  • Tuple3.of(“class2”,“小八”,97L), 加上L,表示该数据是 long 类型的常量
(3) 自定义聚合 apply

例子:使用apply方法来实现单词统计

package cn.itcast.day08.WindowFunction;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;/*** @author lql* @time 2024-02-28 01:14:05* @description TODO:使用apply方法来实现单词统计*/
public class WindowApplyDemo {public static void main(String[] args) throws Exception {//todo 1)初始化flink流处理的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//todo 2)接入数据源DataStreamSource<String> lines = env.socketTextStream("node1", 8888);//todo 3) 数据扁平化SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}});//todo 4) 数据分组WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowStream = wordAndOne.keyBy(t -> t.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(10)));//todo 5) apply 自定义聚合SingleOutputStreamOperator<Tuple2<String, Integer>> result = windowStream.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {@Overridepublic void apply(String s, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {int sum = 0;String key = null;for (Tuple2<String, Integer> wordAndOne : input) {sum += wordAndOne.f1;key = wordAndOne.f0;}out.collect(Tuple2.of(key, sum));}});// todo 6) 打印结果result.print();env.execute();}
}

结果:

8> (hadoop,1)
1> (spark,1)
7> (flink,1)
1> (kafka,1)

总结:

  • apply 方法中 WindowFunction 需要指定输出类型,而不是单纯 Object

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/708830.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Bert基础(五)--解码器(下)

1、 多头注意力层 下图展示了Transformer模型中的编码器和解码器。我们可以看到&#xff0c;每个解码器中的多头注意力层都有两个输入&#xff1a;一个来自带掩码的多头注意力层&#xff0c;另一个是编码器输出的特征值。 让我们用R来表示编码器输出的特征值&#xff0c;用M来…

【JavaEE进阶】 Spring AOP快速上手

文章目录 &#x1f343;什么是AOP&#x1f333;什么是Spring AOP&#x1f334;上手Spring AOP&#x1f6a9;引入依赖&#x1f6a9;编写AOP程序 ⭕总结 &#x1f343;什么是AOP AOP是Aspect Oriented Programming的简称&#xff08;又称为面向切⾯编程&#xff09; 什么是面向…

电力运维是做什么的?电力行业智能运维工作内容?

电力行业智能运维工作内容具体涉及哪些关键任务&#xff1f;实施智能运维过程中&#xff0c;如何利用现代信息技术、人工智能和大数据分析来提升电力系统的运行效率与维护响应速度?在电力行业中引入智能运维后&#xff0c;对于预防性维护、故障诊断、设备寿命预测以及成本控制…

33-k8s项目实战-02-k8s的ca证书有效期更新

一、概述 我们知道&#xff0c;k8s各项组件之间的通信&#xff0c;都是使用https协议进行的&#xff0c;也就是ca证书&#xff0c;那么我们也知道ca证书都是有“有限期的”&#xff0c;一旦过期&#xff0c;系统就无法进行通信了&#xff1b; 这也是k8s在企业当中经常遇到的证书…

亿道信息新三防平板EM-I10J,性能和价格成最大亮点

亿道信息近期推出了一款新三防平板电脑名为EM-I10J&#xff0c;这款设备上市的初衷是为了在满足客户作业需求的同时为其提供更合适的价格选择&#xff0c;但这并不意味着EM-I10J的实力可以被小觑。 外观上I10J与之前的I10U并无不同之处&#xff0c;同样是10.1英寸高清电容式触…

《TCP/IP详解 卷一》第10章 UDP和IP分片

目录 10.1 引言 10.2 UDP 头部 10.3 UDP校验和 10.4 例子 10.5 UDP 和 IPv6 10.6 UDP-Lite 10.7 IP分片 10.7.1 例子&#xff1a;IPV4 UDP分片 10.7.2 重组超时 10.8 采用UDP的路径MTU发现 10.9 IP分片和ARP/ND之间的交互 10.10 最大UDP数据报长度 10.11 UDP服务器…

华为OD技术面试案例3-2024年

技术一面&#xff1a; 1.手撕代码&#xff0c;算法题&#xff1a; 【最小路径和】 手撕代码通过&#xff0c;面试官拍了照片 2.深挖项目&#xff0c;做过的自认为最好的一个项目&#xff0c;描述做过的项目的工作过程&#xff0c;使用到哪些技术&#xff1f; 技术二面&…

数电学习笔记——逻辑函数及其描述方法

目录 一、逻辑函数 二、逻辑函数的描述方法 1、逻辑真值表 2、逻辑函数式 3、逻辑图 4、波形图 三、逻辑函数的两种标准形式 1、最小项与最大项 最小项 最小项的性质 最大项 最大项的性质 2、最大项与最小项的关系 3、逻辑函数的最小项之和形式 4、逻辑函数的最…

(Linux学习二)文件管理基础操作命令笔记

Linux目录结构&#xff1a; bin 二进制文件 boot 启动目录 home 普通用户 root 超管 tmp 临时文件 run 临时运行数据 var 日志 usr 应用程序、文件 etc 配置文件 dev 文件系统 一、基础操作 在 Linux 终端中&#xff0c;你可以使用以下命令来清屏&#xff1a; clear 命令&am…

【深度学习:视频注释】如何为机器学习自动执行视频注释

【深度学习&#xff1a;视频注释】如何为机器学习自动执行视频注释 #1&#xff1a;多目标跟踪 &#xff08;MOT&#xff09; 以确保帧与帧之间的连续性#2&#xff1a;使用插值来填补空白#3: 使用微模型加速人工智能辅助视频注释#4: 自动目标分割提高目标分割质量 自动视频标记通…

Linux/Spectra

Enumeration nmap 第一次扫描发现系统对外开放了22&#xff0c;80和3306端口&#xff0c;端口详细信息如下 22端口运行着ssh&#xff0c;80端口还是http&#xff0c;不过不同的是打开了mysql的3306端口 TCP/80 进入首页&#xff0c;点击链接时&#xff0c;提示域名不能解析&…

分享一点PDF中获取表格的探索过程

版面分析&#xff1a;如何得到标题、如何的得到段落&#xff08;正确的段落&#xff09;、如何得到表格、如何得到图片&#xff0c;图和得到图片上的文字&#xff1f; 还有细节问题&#xff1a;双栏和多栏的问题、公式问题 扫描件&#xff1a;扫描件本质上是图片&#xff0c;如…

【三维重建】【slam】【分块重建】LocalRF:逐步优化的局部辐射场的鲁棒视图合成

项目地址&#xff1a;https://localrf.github.io/ 题目&#xff1a;Progressively Optimized Local Radiance Fields for Robust View Synthesis 来源&#xff1a;KAIST、National Taiwan University、Meta 、University of Maryland, College Park 提示&#xff1a;文章用了s…

【GB28181】wvp-GB28181-pro修改分屏监控为16画面(前端)

引言 作为一个非前端开发人员,自己摸索起来比较费劲,也浪费了很多时间 由于实际开发中,可能预览的画面多于8个,而wvp目前只支持8画面 本文快速帮助开发者修改分屏监控为多画面。例如16画面,20画面等 文章目录 一、 预期效果展示16分割画面20分割画面二、 源码修改-前端修改…

小白水平理解面试经典题目leetcode 606. Construct String from Binary Tree【递归算法】

Leetcode 606. 从二叉树构造字符串 题目描述 例子 小白做题 坐在自习室正在准备刷题的小白看到这道题&#xff0c;想想自己那可是没少和白月光做题呢&#xff0c;也不知道小美刷题刷到哪里了&#xff0c;这题怎么还没来问我&#xff0c;难道是王谦谦去做题了&#xff1f; 这…

用友 NC 23处接口XML实体注入漏洞复现

0x01 产品简介 用友 NC 是用友网络科技股份有限公司开发的一款大型企业数字化平台。 0x02 漏洞概述 用友 NC 多处接口存在XML实体注入漏洞,未经身份验证攻击者可通过该漏洞读取系统重要文件(如数据库配置文件、系统配置文件)、数据库配置文件等等,导致网站处于极度不安全…

使用PARP抑制剂Olaparib对骨肉瘤细胞进行放射增敏【AbMole】

骨肉瘤细胞来源于对辐射不敏感的骨形成间充质细胞。因此&#xff0c;科学家们希望找到新的方法能够使其对放射增敏。研究人员进行了使用PARP抑制剂Olaparib来增强骨肉瘤细胞的放射敏感性的研究。 研究方法主要包含以下几项实验&#xff1a;通过CCK-8和克隆形成实验评估Olapari…

使用 OpenCV 通过 SIFT 算法进行对象跟踪

本文介绍如何使用 SIFT 算法跟踪对象 在当今世界&#xff0c;当涉及到对象检测和跟踪时&#xff0c;深度学习模型是最常用的&#xff0c;但有时传统的计算机视觉技术也可能有效。在本文中&#xff0c;我将尝试使用 SIFT 算法创建一个对象跟踪器。 为什么人们会选择使用传统的计…

【Go语言】Go语言中的字典

Go语言中的字典 字典就是存储键值对映射关系的集合&#xff0c;在Go语言中&#xff0c;需要在声明时指定键和值的类型&#xff0c;此外Go语言中的字典是个无序集合&#xff0c;底层不会按照元素添加顺序维护元素的存储顺序。 如下所示&#xff0c;Go语言中字典的简单示例&…

java spring cloud 企业工程管理系统源码+二次开发+定制化服务

鸿鹄工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离构建工程项目管理系统 1. 项目背景 一、随着公司的快速发展&#xff0c;企业人员和经营规模不断壮大。为了提高工程管理效率、减轻劳动强度、提高信息处理速度和准确性&#xff0c;公司对内部工程管…