【Flink-scala】DataStream编程模型之延迟数据处理

DataStream API编程模型

1.【Flink-Scala】DataStream编程模型之数据源、数据转换、数据输出
2.【Flink-scala】DataStream编程模型之 窗口的划分-时间概念-窗口计算程序
3.【Flink-scala】DataStream编程模型之水位线
4.【Flink-scala】DataStream编程模型之窗口计算-触发器-驱逐器


文章目录

  • DataStream API编程模型
  • 一、延迟数据处理
    • 1.1 侧输出
    • 1.2代码实例
      • 1.2.1 代码运行结果
      • 1.2.2 结果分析
    • 1.3代码运行展示
    • 1.4 题外话


一、延迟数据处理

前一小节已经讲了水位线的相关概念,默认情况下,当水位线超过窗口结束时间之后,再有之前的数据到达时,这些数据会被删除。

为了避免有些迟到的数据被删除,因此产生了allowedLateness的概念。

allowedLateness就是针对事件时间而言,对于水位线超过窗口结束时间之后,还允许有一段时间(也是以事件时间来衡量)来等待之前的数据到达,以便再次处理这些数据。

对于窗口计算而言,如果没有设置allowedLateness,窗口触发计算以后就会被销毁

设置了allowedLateness以后,只有水位线大于“窗口结束时间+allowedLateness”时,窗口才会被销毁。

当没有指定allowedLateness,默认值为0.

1.1 侧输出

通常情况下,用户虽然希望对迟到的数据进行窗口计算,但并不想将结果混入正常的计算流程中,而是想将延迟数据和结果保存到数据库中,便于后期对延时数据进行分析。对这种情况,就需要借助于“侧输出”(Side Output)来处理

用sideOutputLateData(OutputTag)来标记迟到数据计算的结果,然后再使用getSideOutput(lateOutputTag)从窗口中获取lateOutputTag标签对应的数据,之后转成独立的DataStream数据集进行处理

1.2代码实例

如下:

import java.text.SimpleDateFormat
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, TimestampAssigner, TimestampAssignerSupplier, Watermark, WatermarkGenerator, WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Timecase class StockPrice(stockId:String,timeStamp:Long,price:Double)object AllowedLatenessTest {def main(args: Array[String]): Unit = {//设定执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//设定时间特性为事件时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//设定程序并行度env.setParallelism(1)//创建数据源val source = env.socketTextStream("localhost", 9999)//指定针对数据流的转换操作逻辑val stockDataStream = source.map(s => s.split(",")).map(s=>StockPrice(s(0).toString,s(1).toLong,s(2).toDouble))//为数据流分配时间戳和水位线val watermarkDataStream = stockDataStream.assignTimestampsAndWatermarks(new MyWatermarkStrategy)//执行窗口计算val lateData = new OutputTag[StockPrice]("late")val sumStream = watermarkDataStream.keyBy("stockId").window(TumblingEventTimeWindows.of(Time.seconds(3))).allowedLateness(Time.seconds(2L))//注意这里.sideOutputLateData(lateData).reduce((s1, s2) => StockPrice(s1.stockId,s1.timeStamp, s1.price + s2.price))//打印输出sumStream.print("window计算结果:")val late = sumStream.getSideOutput(lateData)late.print("迟到的数据:")//指定名称并触发流计算env.execute("AllowedLatenessTest")}//指定水位线生成策略class MyWatermarkStrategy extends WatermarkStrategy[StockPrice] {override def createTimestampAssigner(context:TimestampAssignerSupplier.Context):TimestampAssigner[StockPrice]={new SerializableTimestampAssigner[StockPrice] {override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long = {element.timeStamp //从到达消息中提取时间戳}}}override def createWatermarkGenerator(context:WatermarkGeneratorSupplier.Context): WatermarkGenerator[StockPrice] ={new WatermarkGenerator[StockPrice](){val maxOutOfOrderness = 10000L //设定最大延迟为10秒var currentMaxTimestamp: Long = 0Lvar a: Watermark = nullval format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")override def onEvent(element: StockPrice, eventTimestamp: Long, output:WatermarkOutput): Unit = {currentMaxTimestamp = Math.max(eventTimestamp, currentMaxTimestamp)a = new Watermark(currentMaxTimestamp - maxOutOfOrderness)output.emitWatermark(a)println("timestamp:" + element.stockId + "," + element.timeStamp + "|" + format.format(element.timeStamp) + "," + currentMaxTimestamp + "|" + format.format(currentMaxTimestamp) + "," + a.toString)}
override def onPeriodicEmit(output:WatermarkOutput): Unit = {// 没有使用周期性发送水印,因此这里没有执行任何操作}}}}
}

先注意这里:

  val watermarkDataStream = stockDataStream.assignTimestampsAndWatermarks(new MyWatermarkStrategy)//执行窗口计算val lateData = new OutputTag[StockPrice]("late")val sumStream = watermarkDataStream.keyBy("stockId").window(TumblingEventTimeWindows.of(Time.seconds(3))).allowedLateness(Time.seconds(2L))   //注意这里.sideOutputLateData(lateData).reduce((s1, s2) => StockPrice(s1.stockId,s1.timeStamp, s1.price + s2.price))

允许元素延迟到达最多 2 秒。即,如果一个元素的时间戳在窗口结束后的 2 秒内到达,它仍然会被包含在窗口的计算中。

 .allowedLateness(Time.seconds(2L))  设置时间为2s。

其余的就是生成水位线和时间戳了,这就不用解释啦。

1.2.1 代码运行结果

1.在nc端输入
stock_1,1602031567000,8.14
stock_1,1602031571000,8.23
stock_1,1602031577000,8.24
stock_1,1602031578000,8.87
stock_1,1602031579000,8.55
stock_1,1602031577000,8.24
stock_1,1602031581000,8.43
stock_1,1602031582000,8.78
stock_1,1602031581000,8.76
stock_1,1602031579000,8.55
stock_1,1602031591000,8.13
stock_1,1602031581000,8.34
stock_1,1602031580000,8.45
stock_1,1602031579000,8.33
stock_1,1602031578000,8.56
stock_1,1602031577000,8.32
然后启动程序

在这里插入图片描述
分析:水位线生成策略:

每次收到一个新的事件,都会比较当前事件的时间戳和之前记录的最大时间戳,更新 currentMaxTimestamp。
水位线被设定为 currentMaxTimestamp - maxOutOfOrderness,即允许最大 10 秒的延迟

再来看迟到数据:

输入数据是:
stock_1,1602031567000,8.14
stock_1,1602031571000,8.23
stock_1,1602031577000,8.24
stock_1,1602031578000,8.87
stock_1,1602031579000,8.55
stock_1,1602031577000,8.24
stock_1,1602031581000,8.43
stock_1,1602031582000,8.78
stock_1,1602031581000,8.76
stock_1,1602031579000,8.55
stock_1,1602031591000,8.13
stock_1,1602031581000,8.34
stock_1,1602031580000,8.45
stock_1,1602031579000,8.33
stock_1,1602031578000,8.56
stock_1,1602031577000,8.32

数据超出窗口结算数据后2s到达算迟到,任何时间戳比水位线晚 10 秒的事件都会被视为迟到。
之前我看书上写的有点疑问,然后拥有环境的网站也有点问题,我就自己本地了,windows 下载了个netcat,运行结果如下:
在这里插入图片描述

timestamp:stock_1,1602031567000|2020-10-07 08:46:07.000,1602031567000|2020-10-07 08:46:07.000,Watermark @ 1602031557000 (2020-10-07 08:45:57.000)
timestamp:stock_1,1602031571000|2020-10-07 08:46:11.000,1602031571000|2020-10-07 08:46:11.000,Watermark @ 1602031561000 (2020-10-07 08:46:01.000)
timestamp:stock_1,1602031577000|2020-10-07 08:46:17.000,1602031577000|2020-10-07 08:46:17.000,Watermark @ 1602031567000 (2020-10-07 08:46:07.000)
timestamp:stock_1,1602031578000|2020-10-07 08:46:18.000,1602031578000|2020-10-07 08:46:18.000,Watermark @ 1602031568000 (2020-10-07 08:46:08.000)
timestamp:stock_1,1602031579000|2020-10-07 08:46:19.000,1602031579000|2020-10-07 08:46:19.000,Watermark @ 1602031569000 (2020-10-07 08:46:09.000)
timestamp:stock_1,1602031577000|2020-10-07 08:46:17.000,1602031579000|2020-10-07 08:46:19.000,Watermark @ 1602031569000 (2020-10-07 08:46:09.000)
timestamp:stock_1,1602031581000|2020-10-07 08:46:21.000,1602031581000|2020-10-07 08:46:21.000,Watermark @ 1602031571000 (2020-10-07 08:46:11.000)
timestamp:stock_1,1602031582000|2020-10-07 08:46:22.000,1602031582000|2020-10-07 08:46:22.000,Watermark @ 1602031572000 (2020-10-07 08:46:12.000)
timestamp:stock_1,1602031581000|2020-10-07 08:46:21.000,1602031582000|2020-10-07 08:46:22.000,Watermark @ 1602031572000 (2020-10-07 08:46:12.000)
timestamp:stock_1,1602031579000|2020-10-07 08:46:19.000,1602031582000|2020-10-07 08:46:22.000,Watermark @ 1602031572000 (2020-10-07 08:46:12.000)
timestamp:stock_1,1602031591000|2020-10-07 08:46:31.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
timestamp:stock_1,1602031581000|2020-10-07 08:46:21.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
timestamp:stock_1,1602031580000|2020-10-07 08:46:20.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
timestamp:stock_1,1602031579000|2020-10-07 08:46:19.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
timestamp:stock_1,1602031578000|2020-10-07 08:46:18.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
window计算结果:> StockPrice(stock_1,1602031567000,8.14)
window计算结果:> StockPrice(stock_1,1602031571000,8.23)
window计算结果:> StockPrice(stock_1,1602031577000,16.48)
window计算结果:> StockPrice(stock_1,1602031578000,25.970000000000002)
window计算结果:> StockPrice(stock_1,1602031578000,34.42)
window计算结果:> StockPrice(stock_1,1602031578000,42.75)
window计算结果:> StockPrice(stock_1,1602031578000,51.31)
timestamp:stock_1,1602031577000|2020-10-07 08:46:17.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
迟到的数据:> StockPrice(stock_1,1602031577000,8.32)

这里是windows运行结果,可能稍微和ubuntu不一样

1.2.2 结果分析

  • stock_1,1602031567000,8.14
    水位线 557
    窗口时间**[567,570)**
    当前事件:stock_1,1602031567000,8.14
window计算结果:> StockPrice(stock_1,1602031567000,8.14)
  • stock_1,1602031571000,8.23
    水位线: 561
    窗口时间:[571,573)
    当前窗口事件 stock_1,1602031571000,8.23
window计算结果:> StockPrice(stock_1,1602031571000,8.23)
  • stock_1,1602031577000,8.24
    水位线:567
    窗口时间:[577,580)
    当前窗口事件:stock_1,1602031577000,8.24
    8.24+8.23
window计算结果:> StockPrice(stock_1,1602031577000,16.48)
  • stock_1,1602031578000,8.87
    水位线:568
    窗口时间:和上一个相同 [577,580)
    当前窗口事件:stock_1,1602031578000,8.87
    stock_1,1602031577000,8.24
    16.48+8.87
window计算结果:> StockPrice(stock_1,1602031578000,25.970000000000002)
  • stock_1,1602031579000,8.55
    水位线:569
    窗口时间:和上一个相同 [577,580)
    当前窗口事件(3个)
    stock_1,1602031578000,8.87
    stock_1,1602031577000,8.24

    stock_1,1602031579000,8.55
    25.97+8.55
window计算结果:> StockPrice(stock_1,1602031578000,34.42)
  • stock_1,1602031577000,8.24
    水位线:这里-10s为567,最大水位线为569,那么最大水位线569
    窗口时间: 属于[567,570) 没有迟到
    当前窗口事件:
    stock_1,1602031567000,8.14
    stock_1,1602031577000,8.24
    34.42+8.24
window计算结果:> StockPrice(stock_1,1602031578000,42.75)
  • stock_1,1602031581000,8.43
    水位线:571
    窗口时间:[581,584)
    当前窗口事件 stock_1,1602031581000,8.43
    注意 571水位线,第一个事件事件窗口时间[567,570),这个就应该结束计算了、
window计算结果:> StockPrice(stock_1,1602031578000,51.31)
之后就没了
为什么没了呢,窗口还没计算完呢,一直在监听啊。
水位线也不涨了
  • stock_1,1602031582000,8.78
    水位线:572
    窗口时间:[581,584)
    当前窗口事件:
    stock_1,1602031581000,8.43
    stock_1,1602031582000,8.78

  • stock_1,1602031581000,8.76
    水位线:572
    窗口时间:[581,584)
    当前窗口事件:
    stock_1,1602031581000,8.43
    stock_1,1602031582000,8.78

    stock_1,1602031581000,8.76

  • stock_1,1602031579000,8.55
    水位线:572
    窗口时间:[577,580)
    当前窗口事件:(4个)
    stock_1,1602031578000,8.87
    stock_1,1602031577000,8.24
    stock_1,1602031579000,8.55

    stock_1,1602031579000,8.55

  • stock_1,1602031591000,8.13
    水位线:581
    窗口时间:[591,594)
    当前窗口事件:
    stock_1,1602031591000,8.13

  • stock_1,1602031581000,8.34
    -水位线:581
    窗口时间:[581,584)
    (此时:[577,580)就应该结束了)
    窗口事件:
    stock_1,1602031581000,8.43
    stock_1,1602031582000,8.78
    stock_1,1602031581000,8.76

    stock_1,1602031581000,8.34

  • stock_1,1602031580000,8.45
    水位线:581
    窗口时间 [580,583)
    窗口事件:stock_1,1602031580000,8.45

  • stock_1,1602031579000,8.33
    水位线:581
    窗口时间:[577,580) 水位线在581的时候就结束了,但它在 allowedLateness 的 2 秒内到达,因此它 不会被视为迟到数据。

  • stock_1,1602031578000,8.56
    水位线:581
    窗口时间属于 [577,580)
    判断水位线和时间戳的关系:水位线超过窗口结束时间
    事件是否迟到:允许超出2s,正好就在580的边界,根据 Flink 的延迟数据规则,任何事件的时间戳如果大于当前窗口结束时间,并且超出了 allowedLateness(即 2 秒),就会被视为迟到数据。这个事件 stock_1,1602031578000,8.56 的时间戳刚好等于窗口 [577, 580) 的结束时间,并且它的时间戳在当前窗口结束后刚好到达,所以 它不会被视为迟到数据。

找了chatgpt问了一下:

这个事件不算迟到数据。尽管它的时间戳正好是窗口结束的边界,但它仍然属于这个窗口 [577, 580) 的一部分,不超出窗口时间范围,也没有超过 allowedLateness 设置的 2 秒,所以它被处理为正常数据。
为什么不算迟到?
Flink 的窗口操作是基于事件时间戳的,窗口 [577, 580) 的结束时间是 1602031578000。当水位线达到 581 时,窗口已经结束,但如果事件的时间戳恰好是窗口的结束时间,它依然被认为是属于该窗口的。
迟到数据的定义:迟到数据是指事件的时间戳超过了当前窗口结束时间,并且超出了允许的迟到时间。在这种情况下,事件时间戳与窗口结束时间对齐,因此并不算迟到。

  • stock_1,1602031577000,8.32
    水位线 581
    窗口时间:[577,580)
    577+2<581
    当前事件的时间戳 1602031577000 恰好等于窗口的起始时间 577,但水位线已经推进到了 581。
    窗口已经结束:水位线达到 581 时,窗口 [577, 580) 的数据已经完全处理完毕,因此任何时间戳在 580 之后到达的事件,都被视为迟到数据。

  • 说明:其中的删除线,我是手动分析的他们如何迟到,但是后来代码发现,数据是一个个来的,你没有超出水位线,那么我就立马把你这条数据计算了,因此输出结果就是累加起来的。

1.3代码运行展示

本来使用的是头歌平台提供的环境,但是环境有问题了,自己手动搭建了一下。幸亏是socket,windows还能实现监听自己,就把代码放在这里啦!
pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>Flink_scala2.13</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.11.0</flink.version> <!-- 使用 Flink 1.15.0 --><target.java.version>1.8</target.java.version><scala.binary.version>2.12</scala.binary.version><scala.version>2.12.16</scala.version></properties><dependencies><!-- 引入 Flink 相关依赖--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><!--			<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><!--			<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><!--			<scope>provided</scope>--></dependency></dependencies>
</project>

test1.scala (AllowedLatenessTest函数)


/*
stock_1,1602031567000,8.14
stock_1,1602031571000,8.23
stock_1,1602031577000,8.24
stock_1,1602031578000,8.87
stock_1,1602031579000,8.55
stock_1,1602031577000,8.24
stock_1,1602031581000,8.43
stock_1,1602031582000,8.78
stock_1,1602031581000,8.76
stock_1,1602031579000,8.55
stock_1,1602031591000,8.13
stock_1,1602031581000,8.34
stock_1,1602031580000,8.45
stock_1,1602031579000,8.33
stock_1,1602031578000,8.56
stock_1,1602031577000,8.32*/
import java.text.SimpleDateFormat
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, TimestampAssigner, TimestampAssignerSupplier, Watermark, WatermarkGenerator, WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
case class StockPrice(stockId:String,timeStamp:Long,price:Double)object test1 {def main(args: Array[String]): Unit = {// *************************** Begin ****************************//设定执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//设定时间特性为事件时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//设定程序并行度env.setParallelism(1)//创建数据源val source = env.socketTextStream("localhost", 9999)//指定针对数据流的转换操作逻辑val stockDataStream = source.map(s => s.split(",")).map(s=>StockPrice(s(0).toString,s(1).toLong,s(2).toDouble))//为数据流分配时间戳和水位线val watermarkDataStream = stockDataStream.assignTimestampsAndWatermarks(new MyWatermarkStrategy)//执行窗口计算val lateData = new OutputTag[StockPrice]("late")val sumStream = watermarkDataStream.keyBy("stockId").window(TumblingEventTimeWindows.of(Time.seconds(3))).allowedLateness(Time.seconds(2L)).sideOutputLateData(lateData).reduce((s1, s2) => StockPrice(s1.stockId,s1.timeStamp, s1.price + s2.price))// **************************** End *****************************//打印输出sumStream.print("window计算结果:")val late = sumStream.getSideOutput(lateData)late.print("迟到的数据:")//指定名称并触发流计算env.execute("AllowedLatenessTest")}//指定水位线生成策略class MyWatermarkStrategy extends WatermarkStrategy[StockPrice] {override def createTimestampAssigner(context:TimestampAssignerSupplier.Context):TimestampAssigner[StockPrice]={new SerializableTimestampAssigner[StockPrice] {override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long = {element.timeStamp //从到达消息中提取时间戳}}}override def createWatermarkGenerator(context:WatermarkGeneratorSupplier.Context): WatermarkGenerator[StockPrice] ={new WatermarkGenerator[StockPrice](){val maxOutOfOrderness = 10000L //设定最大延迟为10秒var currentMaxTimestamp: Long = 0Lvar a: Watermark = nullval format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")override def onEvent(element: StockPrice, eventTimestamp: Long, output:WatermarkOutput): Unit = {currentMaxTimestamp = Math.max(eventTimestamp, currentMaxTimestamp)a = new Watermark(currentMaxTimestamp - maxOutOfOrderness)output.emitWatermark(a)println("timestamp:" + element.stockId + "," + element.timeStamp + "|" + format.format(element.timeStamp) + "," + currentMaxTimestamp + "|" + format.format(currentMaxTimestamp) + "," + a.toString)}override def onPeriodicEmit(output:WatermarkOutput): Unit = {// 没有使用周期性发送水印,因此这里没有执行任何操作}}}}
}

这个是头歌提供的pom.xml,二者应该都能使用,但是你需要下载flink和scala,这不再多说,版本还要匹配哦

<project><groupId>cn.edu.xmu.dblab</groupId><artifactId>wordcount_myID</artifactId><modelVersion>4.0.0</modelVersion><name>WordCount</name><packaging>jar</packaging><version>1.0</version><repositories><repository><id>alimaven</id><name>aliyun maven</name><url>http://maven.aliyun.com/nexus/content/groups/public/</url></repository></repositories><properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><encoding>UTF-8</encoding><scala.version>2.12.2</scala.version><scala.binary.version>2.12</scala.binary.version><hadoop.version>2.7.7</hadoop.version><flink.version>1.11.2</flink.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.22</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.5</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.8.0</version><scope>compile</scope></dependency></dependencies>
<build><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.0</version><executions><execution><goals><goal>compile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>

1.4 题外话

我使用的课本是厦门大学林子雨老师编写的《Flink编程基础》,当前我用的书是2021年9月第一版,第5.6章节的延迟数据处理中(201页),最后迟到的数据8.43写错了,应该是:8.32。正是书上写错了,我才有点疑问发现此处的问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/63019.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024告别培训班 数通、安全、云计算、云服务、存储、软考等1000G资源分享

大类有&#xff1a;软考初级 软考中级 软考高级 华为认证 华三认证&#xff1a; 软考初级&#xff1a; 信息处理技术员 程序员 网络管理员 软考中级&#xff1a; 信息安全工程师 信息系统监理师 信息系统管理工程师 嵌入式系统设计时 数据库系统工程师 电子商务设…

《操作系统 - 清华大学》8 -1:进程的组成

文章目录 1. 进程的组成2. 进程与程序的联系3. 进程与程序的区别4. 进程与程序关系 1. 进程的组成 进程具体包含哪些东西&#xff1a; 首先要执行相应的代码&#xff0c;所以执行代码需要放到内存中代码执行需要处理数据&#xff0c;数据需要放到内存中需要知道现在要执行哪条…

【Java】String类API

创建字符串 字符串字面量"Hello"高效&#xff0c;常量池复用常见、简单的字符串创建 new 关键字new String("Hello")每次创建新对象&#xff0c;性能开销较高显式创建新对象 字符数组new String(char[])转换字符数组字符数组转字符串 StringBuilder/St…

数据结构初阶---二叉树---堆

一、树 1.树的概念 树是一种非线性的数据结构&#xff0c;由n(n≥0)个有限结点组成的一个有层次关系的集合。形状类似一棵倒挂的树&#xff0c;根朝上&#xff0c;分支向下。 根结点没有前驱结点&#xff0c;可以有n(n≥0)个后继结点。 其余结点被分为M个互不相交的集合&am…

C语言 字符串输入输出函数、scanf(“%[^\n]“,)可输入空格 、fgets删除换行符

字符串输入函数&#xff1a; scanf&#xff08;"%s"&#xff0c;数组名&#xff09; gets&#xff08;数组名&#xff09; fgets&#xff08;&#xff09; --- 文件流输入函数 函数原型&#xff1a; int scanf( const char *format, ...…

深度学习camp-第J4周:ResNet与DenseNet结合探索

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 本周任务&#xff1a; 探索ResNet和DenseNet的结合可能性本周任务较难&#xff0c;我们在chatGPT的帮助下完成 一、网络的构建 设计一种结合 ResNet 和 Den…

「iOS」通过CoreLocation Framework深入了解MVC架构

「iOS」通过CoreLocation Framework重新了解多界面传值以及MVC架构 文章目录 「iOS」通过CoreLocation Framework重新了解多界面传值以及MVC架构前言CoreLocation了解根据需求建模设计属性方法设计协议传值Block传值KVONotification通知方式 总结参考文章 前言 在这个学期的前…

ArrayList源码分析、扩容机制面试题,数组和List的相互转换,ArrayList与LinkedList的区别

目录 1.java集合框架体系 2. 前置知识-数组 2.1 数组 2.1.1 定义&#xff1a; 2.1.2 数组如何获取其他元素的地址值&#xff1f;&#xff08;寻址公式&#xff09; 2.1.3 为什么数组索引从0开始呢&#xff1f;从1开始不行吗&#xff1f; 3. ArrayList 3.1 ArrayList和和…

【C++】- 掌握STL List类:带你探索双向链表的魅力

文章目录 前言&#xff1a;一.list的介绍及使用1. list的介绍2. list的使用2.1 list的构造2.2 list iterator的使用2.3 list capacity2.4 list element access2.5 list modifiers2.6 list的迭代器失效 二.list的模拟实现1. list的节点2. list的成员变量3.list迭代器相关问题3.1…

Docker--Docker Container(容器) 之容器实战

对docker容器的前两篇文章 Docker–Docker Container(容器) 之 操作实例 Docker–Docker Container(容器&#xff09; Mysql容器化安装 我们可以先在Docker Hub上查看对应的Mysql镜像,拉取对应的镜像&#xff1a; 拉取mysql5.7版本的镜像&#xff1a; docker pull mysql:5.7…

【汇编语言】内中断(二) —— 安装自己的中断处理程序:你也能控制0号中断

文章目录 前言1. 编程处理0号中断1.1 效果演示1.2 分析所要编写的中断处理程序1.2.1 引发中断1.2.2 中断处理程序1.2.3 中断处理程序do0应该存放的位置1.2.4 中断向量表的修改1.2.5 总结 1.3 程序框架1.4 注意事项1.5 从CPU的角度看中断处理程序1.6 一些问题的思考与解答 2. 安…

VS2019中无法跳转定义_其中之一情况

我习惯了使用VS2019看stm的代码&#xff1b; 遇到的问题&#xff0c;在导入代码后&#xff0c;发现有些函数调用不能跳转到定义&#xff1b; 问题描述步骤 1、导入代码 2、跳转&#xff0c;无法跳转 1、中文路径 2、删除.vs文件 和网上查的都没办法解决 最后发现是VS不支持 …

让 Win10 上网本 Debug 模式 QUDPSocket 信号槽 收发不丢包的方法总结

在前两篇文章里&#xff0c;我们探讨了不少UDP丢包的解决方案。经过几年的摸索测试&#xff0c;其实方法非常简单, 无需修改代码。 1. Windows 下设置UDP缓存 这个方法可以一劳永逸解决UDP的收发丢包问题&#xff0c;只要添加注册表项目并重启即可。即使用Qt的信号与槽&#…

Elasticsearch:ES|QL 中的全文搜索 - 8.17

细心的开发者如果已经阅读我前两天发布的文章 “Elastic 8.17&#xff1a;Elasticsearch logsdb 索引模式、Elastic Rerank 等”&#xff0c;你就会发现在 8.17 的发布版中&#xff0c;有一个重要的功能发布。那就是 ES|QL 开始支持全文搜索了。在今天的文章中我们来尝试一下。…

SQL和Python 哪个更容易自学?

SQL和Python不是一个物种&#xff0c;Python肯定更难学习。如果你从事数据工作&#xff0c;我建议先学SQL、有余力再学Python。因为SQL不光容易学&#xff0c;而且前期的投入产出比更大。 SQL是数据查询语言&#xff0c;场景限于数据查询和数据库的管理&#xff0c;对大部分数据…

【unity】从零开始制作平台跳跃游戏--界面的认识,添加第一个角色!

在上一篇文章中&#xff0c;我们已经完成了unity的环境配置与安装⬇️ 【Unity】环境配置与安装-CSDN博客 接下来&#xff0c;让我们开始新建一个项目吧&#xff01; 新建项目 首先进入unityHub的项目页面&#xff0c;点击“新项目”&#xff1a; 我们这个系列将会以2D平台…

怎么禁用 vscode 中点击 go 包名时自动打开浏览器跳转到 pkg.go.dev

本文引用怎么禁用 vscode 中点击 go 包名时自动打开浏览器跳转到 pkg.go.dev 在 vscode 设置项中配置 gopls 的 ui.navigation.importShortcut 为 Definition 即可。 "gopls": {"ui.navigation.importShortcut": "Definition" }ui.navigation.i…

Unity3D实现抽象类的应用场景例子

系列文章目录 unity知识点 文章目录 系列文章目录👉前言👉一、示例👉二、使用步骤👉三、抽象类和接口的区别👉3-1、抽象类👉3-2、接口类👉壁纸分享👉总结👉前言 假设我们正在制作一个游戏,游戏中有多种不同类型的角色,这些角色都有一些共同的行为(比如移…

数据仓库工具箱—读书笔记01(数据仓库、商业智能及维度建模初步)

数据仓库、商业智能及维度建模初步 记录一下读《数据仓库工具箱》时的思考&#xff0c;摘录一些书中关于维度建模比较重要的思想与大家分享&#x1f923;&#x1f923;&#x1f923; 博主在这里先把这本书"变薄"~有时间的小伙伴可以亲自再读一读&#xff0c;感受一下…

docker启动一个helloworld(公司内网服务器)

这里写目录标题 容易遇到的问题&#xff1a;1、docker连接问题 我来介绍几种启动 Docker Hello World 的方法&#xff1a; 最简单的方式&#xff1a; docker run hello-world这会自动下载并运行官方的 hello-world 镜像。 使用 Nginx 作为 Hello World&#xff1a; docker…