【Flink专栏 03】深入理解Flink的Watermark:实时流处理的时间概念与乱序处理

文章目录

  • 01 基本概念
  • 02 工作原理
  • 03 优势与劣势
  • 04 核心组件
  • 05 Watermark 生成器 使用
  • 06 应用场景
  • 07 注意事项
  • 08 案例分析
    • 8.1 窗口统计数据不准
    • 8.2 水印是如何解决延迟与乱序问题?
    • 8.3 详细分析
  • 09 项目实战demo
    • 9.1 pom依赖
    • 9.2 log4j2.properties配置
    • 9.3 Watermark水印作业

01 基本概念

Watermark 是用于处理事件时间的一种机制,用于表示事件时间流的进展。在流处理中,由于事件到达的顺序和延迟,系统需要一种机制来衡量事件时间的进展,以便正确触发窗口操作等。Watermark 就是用来标记事件时间的进展情况的一种特殊数据元素。

02 工作原理

Watermark 的生成方式通常是由系统根据数据流中的事件来自动推断生成的。一般来说,系统会根据事件时间戳和一定的策略来生成 Watermark,以此来表示事件时间的进展。在 Flink 中,通常会有内置的 Watermark 生成器或者用户自定义的生成器来实现这个功能。

当一个 Watermark 被生成后,它会被发送到流处理的所有并行任务中。任务会根据接收到的 Watermark,将小于或等于 Watermark 的事件时间的数据触发相关操作(如窗口计算),以此来确保计算的正确性。

03 优势与劣势

优点:

  • Watermark 可以确保流处理系统正确处理事件时间,避免了由于乱序和延迟引起的计算错误。
  • 可以根据业务需求和数据特征灵活调整 Watermark 生成的策略,以适应不同的场景。
  • Watermark 的引入使得流处理系统更具健壮性,能够处理各种实时数据场景。

缺点:

  • Watermark 的生成可能会带来一定的开销,尤其是在数据量庞大、事件频繁的情况下,可能会对系统性能产生一定影响。
  • 对于某些特殊的场景,例如极端乱序或者延迟过大的情况,Watermark 可能无法完全解决事件时间处理的问题。

04 核心组件

  • Apache Flink中的水印(Watermark)是事件时间处理的核心组件之一,它用于解决无序事件流中的事件时间问题。水印是一种元数据,用于告知系统事件时间流的进度,从而使系统能够在处理延迟的数据时做出正确的决策。

    以下是Flink中水印的核心组件:

    1. Watermark生成器(Watermark Generator)
      • Watermark生成器负责生成水印,并将其插入到数据流中。
      • 水印生成的策略通常与数据源有关。例如,对于有序的数据源,可以根据数据的事件时间直接生成水印;对于无序数据源,则可能需要一些启发式方法来生成水印。
    2. AssignerWithPeriodicWatermarks
      • 这是一个Flink提供的接口,用于在数据流中分配水印。
      • 实现此接口的类需要实现两个方法:extractTimestamp()用于提取事件时间戳,getCurrentWatermark()用于生成当前水印。
    3. AssignerWithPunctuatedWatermarks
      • 与上述相似,但是这个接口适用于在特定条件下(例如特定的事件)生成水印的场景。
    4. Watermark延迟(Watermark Lag)
      • 衡量系统中水印到达事件流的延迟程度。通常,水印到达得越快,系统对事件时间处理的准确性就越高。
    5. Watermark策略(Watermarking Strategy)
      • 这是一个配置项,用于确定水印生成的策略。可以基于固定的时间间隔生成水印,也可以根据事件流的特性进行自适应调整。
    6. Watermark传递和处理
      • Flink通过数据流将水印传递给各个操作符(operators),从而确保水印在整个流处理拓扑中传递。
      • 在处理过程中,水印用于确定事件时间窗口(Event Time Windows)的关闭时机,以及触发一些基于事件时间的操作,如触发窗口计算等。
    7. 处理水印(Handling Watermarks)
      • 在窗口计算等操作中,Flink需要根据水印来判断是否可以触发计算操作,以此保证结果的正确性和完整性。

    水印的核心作用在于解决事件时间处理中的乱序问题,通过适当的水印策略和生成机制,可以有效地处理延迟数据和乱序数据,保证数据处理的准确性和时效性。

05 Watermark 生成器 使用

在 Apache Flink 中,提供了一些内置的 Watermark 生成器,这些生成器可以用于简化在流处理中的 Watermark 管理。以下是一些常用的内置 Watermark 生成器:

  1. BoundedOutOfOrdernessTimestampExtractor:

    • 描述: 这是 Flink 内置的基于有界乱序时间的 Watermark 生成器。

    • 用法: 用户可以通过指定最大允许的乱序时间来创建一个 BoundedOutOfOrdernessTimestampExtractor 实例。通常情况下,用户需要实现 extractTimestamp 方法,从事件中提取事件时间戳。

    • 示例:

      public class MyTimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<MyEvent> {public MyTimestampExtractor(Time maxOutOfOrderness) {super(maxOutOfOrderness);}@Overridepublic long extractTimestamp(MyEvent event) {return event.getTimestamp();}
      }
      
  2. AscendingTimestampExtractor:

    • 描述: 这是一个简单的 Watermark 生成器,适用于按照事件时间戳升序排列的数据流。

    • 用法: 用户只需实现 extractAscendingTimestamp 方法,从事件中提取事件时间戳。

    • 示例:

      public class MyAscendingTimestampExtractor extends AscendingTimestampExtractor<MyEvent> {@Overridepublic long extractAscendingTimestamp(MyEvent event) {return event.getTimestamp();}
      }
      
  3. AssignerWithPunctuatedWatermarks:

    • 描述: 这是一种特殊类型的 Watermark 生成器,它可以基于某些事件的属性产生 Watermark。

    • 用法: 用户需要实现 checkAndGetNextWatermark 方法,根据事件的某些属性来判断是否生成 Watermark。

    • 示例:

      public class MyPunctuatedWatermarkAssigner implements AssignerWithPunctuatedWatermarks<MyEvent> {@Overridepublic long extractTimestamp(MyEvent element, long previousElementTimestamp) {return element.getTimestamp();}@Overridepublic Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {// 根据 lastElement 的某些属性判断是否生成 Watermarkif (lastElement.getProperty() > threshold) {return new Watermark(extractedTimestamp);}return null; // 如果不生成 Watermark,则返回 null}
      }
      

这些内置的 Watermark 生成器提供了灵活性和方便性,使得在 Flink 中实现基于事件时间的处理变得更加容易。根据具体的业务需求和数据特征,可以选择合适的 Watermark 生成器来确保准确的事件时间处理。

06 应用场景

在Apache Flink 1.18中,水印(Watermark)是事件时间处理的核心组件,用于解决事件时间流处理中的乱序和延迟数据的问题。下面是一些Flink 1.18中集成Watermark水印的应用场景:

  1. 流式窗口操作
    • 在流式处理中,经常需要对事件进行窗口化操作,例如按时间窗口、会话窗口等进行聚合计算。Watermark的到达可以作为触发窗口计算的信号,确保窗口在事件时间上的正确性。这种情况下,Watermark能够确保窗口内的数据已经全部到达,可以进行聚合计算,同时还能够处理延迟的数据。
  2. 处理乱序数据
    • 在实际的数据流中,事件通常不会按照严格的时间顺序到达,可能存在乱序的情况。Watermark可以帮助系统理清事件的先后顺序,确保在事件时间上的正确性。通过适当设置Watermark的生成策略,可以根据数据特性来处理乱序数据,保证数据处理的正确性。
  3. 事件时间窗口计算
    • 在处理事件时间窗口时,Watermark起到了关键作用。它确定了窗口的关闭时机,即在Watermark达到窗口的结束时间时,系统可以安全地关闭该窗口,并对其中的数据进行计算。这确保了窗口计算的正确性,同时也能够处理延迟数据,使得窗口计算能够在数据到达时即时进行。
  4. 处理迟到的数据
    • Watermark还可以用于处理迟到的数据,即已经超过窗口关闭时限但仍然到达的数据。通过设置适当的延迟容忍阈值,可以容忍一定程度的迟到数据,并将其纳入窗口计算中。这样可以提高数据处理的完整性和准确性。
    • 实时数据监控和异常检测
    • 在实时数据流中,通常需要对数据进行实时监控和异常检测。Watermark可以用于确定事件时间的进度,从而实现实时监控和异常检测。例如,可以基于事件时间窗口对数据进行统计分析,发现突发的异常情况,并及时采取相应的措施。

总的来说,Flink 1.18中集成Watermark水印的应用场景涵盖了广泛的实时数据处理领域,包括流式窗口操作、处理乱序数据、事件时间窗口计算、处理迟到的数据以及实时数据监控和异常检测等方面。Watermark作为事件时间处理的核心组件,为Flink提供了处理实时数据流的强大功能,能够确保数据处理的准确性和时效性。

07 注意事项

Apache Flink 中水印(Watermark)的使用是关键的,特别是在处理事件时间(Event Time)数据时。水印是一种机制,用于处理无序事件流,并确保在执行窗口操作时数据的完整性和正确性。以下是在使用 Flink 1.18 中水印的一些注意事项:

  1. 水印生成器(Watermark Generators)的选择
    • Flink 提供了多种内置的水印生成器,如 BoundedOutOfOrdernessTimestampExtractor 和 AscendingTimestampExtractor。
    • BoundedOutOfOrdernessTimestampExtractor 适用于处理带有乱序的数据流,它会为每个事件引入一定的延迟。
    • AscendingTimestampExtractor 适用于处理按事件顺序到达的数据流,它假定数据已经按照事件时间排序。
  2. 水印延迟(Watermark Lag)的设置
    • 设置水印延迟是非常重要的,它决定了 Flink 在处理数据时能够容忍的事件延迟时间。
    • 如果设置的水印延迟过小,可能会导致窗口操作不正确,因为 Flink 认为某些事件已经到达,但实际上它们还没有到达。
    • 如果设置的水印延迟过大,可能会导致窗口操作的延迟增加,因为 Flink 需要等待更长时间以确保数据的完整性。
  3. 数据源的处理
    • 在读取数据源时,确保正确地分配时间戳并生成水印。这通常需要在数据源的读取逻辑中明确指定时间戳和水印生成的逻辑。
  4. 水印与窗口操作的关系
    • 在执行窗口操作(如窗口聚合、窗口计算等)时,水印的生成和处理是至关重要的。
    • 水印确保在触发窗口计算时,Flink 已经收到了窗口结束时间之前的所有数据,从而确保计算结果的准确性。
  5. 定期检查水印生成是否正常
    • 在部署 Flink 作业时,建议定期检查水印的生成情况。可以通过 Flink 的监控界面或日志来查看水印的生成情况,并根据需要调整水印生成的逻辑和设置。
  6. 监控和调试
    • 在使用水印时,需要重点关注作业的监控和调试,以确保水印的生成和处理是符合预期的。
    • 如果发现数据延迟或窗口计算不正确,可以通过监控数据流和日志来定位和解决问题,可能需要调整水印的生成逻辑或调整水印延迟来改善作业的性能和准确性。
  7. 数据倾斜和性能优化
    • 在使用水印时,需要注意数据倾斜可能会影响水印的生成和处理性能。可以通过合理的数据分片和并行处理来减轻数据倾斜带来的影响,从而提高作业的性能和稳定性。

总的来说,水印在 Flink 中的使用是非常重要的,它能够确保在处理事件时间数据时保持数据的完整性和正确性。因此,在设计和部署 Flink 作业时,需要特别注意水印的生成和处理,以确保作业能够正确运行并获得良好的性能表现。

08 案例分析

8.1 窗口统计数据不准

当涉及到事件时间处理时,延迟和乱序是非常常见的情况。下面是一个简单的案例,演示了在事件时间处理中可能遇到的延迟和乱序问题。

假设我们有一个用于监控网站用户访问的实时数据流。每个事件都包含用户ID、访问时间戳和访问的网页URL。我们想要计算每个用户在每小时内访问的不同网页数量。

考虑到网络传输和数据处理可能会引入延迟和乱序,我们的数据流可能如下所示:

Event 1: {UserID: 1, Timestamp: 12:00:05, URL: "example.com/page1"}
Event 2: {UserID: 2, Timestamp: 12:00:10, URL: "example.com/page2"}
Event 3: {UserID: 1, Timestamp: 12:00:15, URL: "example.com/page2"}
Event 4: {UserID: 1, Timestamp: 11:59:58, URL: "example.com/page3"}   <-- 延迟
Event 5: {UserID: 2, Timestamp: 12:00:02, URL: "example.com/page4"}   <-- 乱序

在这个示例中,Event 4由于延迟而晚于其他事件到达,而Event 5由于乱序而在其本应到达的时间之前到达。

如果没有使用水印机制,Flink 可能会错误地将 Event 4 的数据统计到 12:00:00 ~ 12:01:00 的窗口中,这是因为 Flink 默认情况下是根据接收到事件的时间来进行处理的,而不是根据事件实际发生的事件时间。

8.2 水印是如何解决延迟与乱序问题?

在上述案例中,Flink 的水印(Watermark)机制通过指示事件时间的上限,帮助系统确定事件时间窗口的边界。水印本质上是一种元数据,它告知 Flink 在某个时间点之前的数据已经全部到达。

下面简要说明水印如何在案例中发挥作用:

  1. 处理延迟数据
    • 当 Event 4 发生延迟到达时,水印会逐渐推进,最终达到 Event 4 的事件时间戳(11:59:58)。
    • Flink 知道在水印之前的所有数据都已经到达,因此即使 Event 4 晚到,也不会影响窗口的触发。
  2. 处理乱序数据
    • 当 Event 5 由于乱序提前到达时,水印仍然在逐渐推进。
    • Flink 通过水印判断,在当前水印之前的所有数据都已到达,因此可以触发相应的窗口计算。
  3. 窗口触发
    • Flink 会根据水印确定触发窗口的时机。当水印到达某个时间戳时,Flink 知道在该水印之前的数据已经全部到达,可以安全地触发窗口计算。
    • 比如,在水印到达 12:00:05 时,Flink 可以触发 12:00:00 - 12:01:00 的窗口计算,处理这一时段内的数据。

综合来说,水印帮助 Flink 在事件时间处理中正确处理延迟和乱序的数据,确保窗口操作的准确性和完整性。通过逐渐推进水印,系统能够在事件时间轴上有序地进行处理,而不会受到延迟和乱序数据的影响。

8.3 详细分析

假设我们有以下十条乱序的事件数据,每条数据包含事件时间戳和相应的值:

事件时间戳(毫秒)  值
1000               10
2000               15
3000               12
1500               8
2500               18
1200               6
1800               14
4000               20
3500               16
3200               9

我们将使用Watermark来处理这些数据,并进行窗口统计。假设窗口大小为2秒,最大乱序时间为1秒。

使用Watermark前的统计

  1. 当接收到事件时间戳为1000毫秒时,将值10加入窗口。
  2. 当接收到事件时间戳为2000毫秒时,将值15加入窗口。
  3. 当接收到事件时间戳为3000毫秒时,将值12加入窗口。
  4. 当接收到事件时间戳为1500毫秒时,将值8加入窗口。
  5. 当接收到事件时间戳为2500毫秒时,将值18加入窗口。
  6. 当接收到事件时间戳为1200毫秒时,将值6加入窗口。
  7. 当接收到事件时间戳为1800毫秒时,将值14加入窗口。
  8. 当接收到事件时间戳为4000毫秒时,将值20加入窗口。
  9. 当接收到事件时间戳为3500毫秒时,将值16加入窗口。
  10. 当接收到事件时间戳为3200毫秒时,将值9加入窗口。

使用Watermark后的统计

Watermark的计算过程如下: Watermark = max(当前Watermark, 当前事件时间 - 最大乱序时间)

在这个例子中,我们设定最大乱序时间为1秒,即1000毫秒。

  1. 当收到事件时间戳为1000毫秒时,Watermark = max(0, 1000 - 1000) = 0毫秒。
  2. 当收到事件时间戳为2000毫秒时,Watermark = max(0, 2000 - 1000) = 1000毫秒。
  3. 当收到事件时间戳为3000毫秒时,Watermark = max(1000, 3000 - 1000) = 2000毫秒。
  4. 当收到事件时间戳为1500毫秒时,Watermark = max(2000, 1500 - 1000) = 2000毫秒。
  5. 当收到事件时间戳为2500毫秒时,Watermark = max(2000, 2500 - 1000) = 2000毫秒。
  6. 当收到事件时间戳为1200毫秒时,Watermark = max(2000, 1200 - 1000) = 2000毫秒。
  7. 当收到事件时间戳为1800毫秒时,Watermark = max(2000, 1800 - 1000) = 2000毫秒。
  8. 当收到事件时间戳为4000毫秒时,Watermark = max(2000, 4000 - 1000) = 3000毫秒。
  9. 当收到事件时间戳为3500毫秒时,Watermark = max(3000, 3500 - 1000) = 3000毫秒。
  10. 当收到事件时间戳为3200毫秒时,Watermark = max(3000, 3200 - 1000) = 3000毫秒。

Watermark确定了什么时候触发窗口统计。在本例中,当Watermark超过窗口的结束时间时,窗口将被关闭,并进行统计。因此,Watermark确保了即使在乱序数据的情况下,窗口统计也能够按照正确的事件时间顺序进行。

为了更清晰地展示Watermark的影响,以下是每个事件被处理时的Watermark状态和窗口统计的结果:

事件时间戳(毫秒)  值   Watermark    窗口统计结果
1000               10   0            10
2000               15   1000         25
3000               12   2000         27
1500               8    2000         27
2500               18   2000         30
1200               6    2000         30
1800               14   2000         32
4000               20   3000         36
3500               16   3000         36
3200               9    3000         36

这里的窗口统计结果是在Watermark触发时计算的。在Watermark超过窗口结束时间时,窗口会被关闭,并进行统计。

09 项目实战demo

9.1 pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xsy</groupId><artifactId>aurora_flink_connector_file</artifactId><version>1.0-SNAPSHOT</version><!--属性设置--><properties><!--java_JDK版本--><java.version>11</java.version><!--maven打包插件--><maven.plugin.version>3.8.1</maven.plugin.version><!--编译编码UTF-8--><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--输出报告编码UTF-8--><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding></properties><!--通用依赖--><dependencies><!--集成日志框架 start--><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>2.17.1</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>2.17.1</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.17.1</version></dependency><!--集成日志框架 end--><!-- json --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency><!-- flink读取Text File文件依赖 start--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>1.18.0</version></dependency><!-- flink读取Text File文件依赖 end--><!-- flink基础依赖 start --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.18.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>1.18.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.18.0</version></dependency><!-- flink基础依赖 end --></dependencies><!--编译打包--><build><finalName>${project.name}</finalName><!--资源文件打包--><resources><resource><directory>src/main/resources</directory></resource><resource><directory>src/main/java</directory><includes><include>**/*.xml</include></includes></resource></resources><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>org.google.code.flindbugs:jar305</exclude><exclude>org.slf4j:*</exclude><excluder>org.apache.logging.log4j:*</excluder></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.aurora.KafkaStreamingJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><!--插件统一管理--><pluginManagement><plugins><!--maven打包插件--><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring.boot.version}</version><configuration><fork>true</fork><finalName>${project.build.finalName}</finalName></configuration><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin><!--编译打包插件--><plugin><artifactId>maven-compiler-plugin</artifactId><version>${maven.plugin.version}</version><configuration><source>${java.version}</source><target>${java.version}</target><encoding>UTF-8</encoding><compilerArgs><arg>-parameters</arg></compilerArgs></configuration></plugin></plugins></pluginManagement></build>
</project>

9.2 log4j2.properties配置

rootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmprootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmp

9.3 Watermark水印作业

package com.aurora.demo;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Random;/*** 描述:Flink集成Watermark水印** @author 浅夏的猫* @version 1.0.0* @date 2024-02-08 10:31:40*/
public class WatermarkStreamingJob {private static final Logger logger = LoggerFactory.getLogger(WatermarkStreamingJob.class);public static void main(String[] args) throws Exception {// 创建 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 自定义数据源,每隔1000ms下发一条数据SourceFunction<JSONObject> dataSource = new SourceFunction<>() {private volatile boolean running = true;@Overridepublic void run(SourceContext<JSONObject> sourceContext) throws Exception {while (running) {long timestamp = System.currentTimeMillis();timestamp = timestamp - new Random().nextInt(11) + 10;// 将时间戳转换为 LocalDateTime 对象LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(timestamp), ZoneId.systemDefault());// 定义日期时间格式DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");// 格式化日期时间对象为指定格式的字符串String format = formatter.format(dateTime);JSONObject dataObj = new JSONObject();int transId = 8;dataObj.put("userId", "user_" + transId);dataObj.put("timestamp", timestamp);dataObj.put("datetime", format);dataObj.put("url", "example.com/page" + transId);logger.info("数据源url={},用户={},交易时间={},系统时间={}", "example.com/page" + transId, "user_" + transId, format);Thread.sleep(1000);sourceContext.collect(dataObj);}}@Overridepublic void cancel() {running = false;}};//创建水印策略处理事件发生时间TimestampAssignerSupplier<JSONObject> timestampAssignerSupplier = new TimestampAssignerSupplier<JSONObject>() {@Overridepublic TimestampAssigner<JSONObject> createTimestampAssigner(Context context) {return new TimestampAssigner<JSONObject>() {@Overridepublic long extractTimestamp(JSONObject element, long recordTimestamp) {//使用自定义的事件发生时间来做水印,确保窗口统计的是按照我们的时间字段统计,提高准确度,否则默认使用消费时间return element.getLong("timestamp");}};}};//创建数据流env.addSource(dataSource).assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(timestampAssignerSupplier))//按照url分组.keyBy(new KeySelector<JSONObject, Object>() {@Overridepublic Object getKey(JSONObject jsonObject) throws Exception {return jsonObject.getString("url");}}).window(TumblingEventTimeWindows.of(Time.seconds(2))).reduce(new ReduceFunction<JSONObject>() {@Overridepublic JSONObject reduce(JSONObject reduceResult, JSONObject record) throws Exception {logger.info("窗口统计url={},用户流水={},次数={}", reduceResult.getString("url"), reduceResult.getString("userId"), reduceResult.getInteger("urlNum") == null ? 1 : reduceResult.getInteger("urlNum"));int urlNum = reduceResult.getInteger("urlNum") == null ? 1 : reduceResult.getInteger("urlNum");reduceResult.put("urlNum", urlNum + 1);return reduceResult;}}).print();// 执行任务env.execute("WatermarkStreamingJob");}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/684413.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【研究生复试】计算机软件工程人工智能研究生复试——资料整理(速记版)——自我介绍(英文)

1、JAVA 2、计算机网络 3、计算机体系结构 4、数据库 5、计算机租场原理 6、软件工程 7、大数据 8、英文 自我介绍 自我介绍 英文 自我介绍 英文 第一段&#xff1a; Good afternoon, dear professors, thank you for the chance to introduce myself. My name is Yan Zhen …

MTR++论文阅读

https://zhuanlan.zhihu.com/p/654070149 文章亮点&#xff1a; Dense Future Prediction for All Agent&#xff1a;将预测的结果也encode起来&#xff0c;用于平衡障碍物之间的预测结果。不过在infer的时候这一部分不会进行用数据集聚类获得query轨迹点&#xff08;goal 点&…

ChatGPT高效提问—prompt实践(白领助手)

ChatGPT高效提问—prompt实践&#xff08;白领助手&#xff09; ​ 随着社会的不断发展&#xff0c;白领的比例越来越高。白领的工作通常较为繁忙&#xff0c;需要管理复杂的项目。工作量大、要求高、任务紧急&#xff0c;时间分配不当部分可能导致工作效率低下&#xff0c;任…

力扣_字符串9—单词接龙I、II

题目 按字典 w o r d L i s t wordList wordList 完成从单词 b e g i n W o r d beginWord beginWord 到单词 e n d W o r d endWord endWord 转化&#xff0c;一个表示此过程的 转换序列 是形式上像 b e g i n W o r d − > s 1 − > s 2 − > . . . − > s …

java 泛型----T、?的使用

java中T表示泛型。&#xff1f;表示不确定的类型。 jdk使用K表示键&#xff0c;V表示值&#xff0c;T表示type类型,E表示enum枚举。这四个符号只是表示泛型名称&#xff0c;可以换成其他字母&#xff0c;但是需要在之前声明。 参考文章&#xff1a;java泛型&#xff1a;T与?的…

问题:人的安全知识和技能是天生的。() #媒体#知识分享#学习方法

问题&#xff1a;人的安全知识和技能是天生的。&#xff08;) 人的安全知识和技能是天生的。() 参考答案如图所示 问题&#xff1a;&#xff08;&#xff09;是党和国家的根本所在、命脉所在&#xff0c;是全国各族人民的利益所在、幸福所在。 A.人民当家作主 B.坚持和完善…

QT+OSG/osgEarth编译之八十七:osgdb_p3d+Qt编译(一套代码、一套框架,跨平台编译,版本:OSG-3.6.5插件库osgdb_p3d)

文章目录 一、osgdb_p3d介绍二、文件分析三、pro文件四、编译实践一、osgdb_p3d介绍 P3DXML是Panda3D引擎中使用的一种文件格式,用于描述3D场景的层次结构和属性。它是一种基于XML(eXtensible Markup Language)的文本格式,可以被Panda3D引擎读取和解析。 P3DXML文件包含了…

OpenAI突然发布首款文生视频模型——Sora;谷歌发布Gemini 1.5,迈向多模态大模型新时代

&#x1f989; AI新闻 &#x1f680; OpenAI突然发布首款文生视频模型——Sora 摘要&#xff1a;OpenAI发布了首个AI视频模型Sora&#xff0c;可以根据文字指令生成神级效果的长视频&#xff0c;引发了广泛关注和震惊。 Sora模型通过深入理解语言和图像&#xff0c;能够创造出…

代码随想录算法训练营第二十七天|贪心算法理论基础,455.分发饼干,376. 摆动序列,53. 最大子序和

系列文章目录 代码随想录算法训练营第一天|数组理论基础&#xff0c;704. 二分查找&#xff0c;27. 移除元素 代码随想录算法训练营第二天|977.有序数组的平方 &#xff0c;209.长度最小的子数组 &#xff0c;59.螺旋矩阵II 代码随想录算法训练营第三天|链表理论基础&#xff…

中国电子学会2023年12月份青少年软件编程Scratch图形化等级考试试卷三级真题(含答案)

2023-12 Scratch三级真题 分数&#xff1a;100 题数&#xff1a;31 测试时长&#xff1a;60min 一、单选题(共18题&#xff0c;共50分) 1.运行左图程序&#xff0c;想得到右图中的效果&#xff0c;红色框应填写的数值是&#xff1f;&#xff08;D&#xff09;(3分) A.12 …

《合成孔径雷达成像算法与实现》Figure6.18

% rho_r c/(2*Fr)而不是rho_r c/(2*Bw) % Hsrcf exp函数里忘记乘pi了 clc clear close all参数设置 距离向参数设置 R_eta_c 20e3; % 景中心斜距 Tr 2.5e-6; % 发射脉冲时宽 Kr 20e12; % 距离向调频率 alpha_os_r 1.2; …

【题解】—— LeetCode一周小结6

【题解】—— 每日一道题目栏 上接&#xff1a;【题解】—— LeetCode一周小结5 5.跳跃游戏 VI 题目链接&#xff1a;1696. 跳跃游戏 VI 给你一个下标从 0 开始的整数数组 nums 和一个整数 k 。 一开始你在下标 0 处。每一步&#xff0c;你最多可以往前跳 k 步&#xff0c;…

蓝桥杯电子类单片机提升三——NE555

目录 单片机资源数据包_2023 一、NE555和定时器工作模式 1.NE555的介绍 2.定时器的计数模式 二、NE555频率读取代码的实现 1.定时器0初始化 2.通过读取TH0和TL0来读取频率 3.通过中断读取频率 三、完整代码演示 通过读取TH0和TL0来读取频率 main.c 通过中断读取频…

26.执行上下文/作用域链/闭包

1. 对闭包的理解 闭包是指有权访问另一个函数作用域中变量的函数&#xff0c;创建闭包的最常见的方式就是在一个函数内创建另一个函数&#xff0c;创建的函数可以访问到当前函数的局部变量。 闭包有两个常用的用途&#xff1b; 闭包的第一个用途是使我们在函数外部能够访问到…

云数据中心网络架构与技术

第2章 认识云数据中心网络 云数据中心是一种基于云计算架构的新型数据中心&#xff0c;其计算、存储及网络资源松耦合&#xff0c;各种IT设备完全虚拟化&#xff0c;模块化程度、自动化程度、绿色节能程度均较高。云数据中心网络的特点&#xff0c;首先是高度的虚拟化&#x…

qml中解决Page控件头部元素Margin不生效的问题

0、想要的效果 1、问题描述 经测试&#xff1a;Page的头部无法完美的进行左右边距设置&#xff0c;leftMargin可以&#xff0c;rightMargin不可以。。。。 Page {// ...header: Frame {id: headerheight: 70// 必须首先锚定位&#xff0c;然后设置边距才生效padding: 0anchor…

QlikSense: 通过 Insight Advisor 创建可视化

通过 Insight Advisor 创建可视化 探索你的数据&#xff0c;并通过 Insight Advisor 分析类型 和 Insight Advisor 搜索创建可视化。Insight Advisor 使用 Qlik cognitive engine 和应用程序的逻辑模型为您创建可视化。单击工作表中的 Insight Advisor 以使用 Insight Advisor…

LeetCode 31天

455. 分发饼干 class Solution { public:int findContentChildren(vector<int>& g, vector<int>& s) {// 先排序sort(g.begin(), g.end());sort(s.begin(), s.end());int i 0;int j 0;while (i < g.size() && j < s.size()) {if (s[j] &g…

构建智慧交通平台:架构设计与实现

随着城市交通的不断发展和智能化技术的迅速进步&#xff0c;智慧交通平台作为提升城市交通管理效率和水平的重要手段备受关注。本文将探讨如何设计和实现智慧交通平台的系统架构&#xff0c;以应对日益增长的城市交通需求&#xff0c;并提高交通管理的智能化水平。 ### 1. 智慧…

【电路笔记】-LR串联电路

LR串联电路 文章目录 LR串联电路1、概述2、示例1所有线圈、电感器、扼流圈和变压器都会在其周围产生磁场,由电感与电阻串联组成,形成 LR 串联电路。 1、概述 在本节有关电感器的第一个文章中,我们简要介绍了电感器的时间常数,指出流过电感器的电流不会瞬时变化,而是会以恒…