文章目录
- 事件时间支持
- Flink状态编程
- 一、状态的类型
- 1. 托管状态(Managed State)
- 2. 原始状态(Raw State)
- 二、状态的管理和容错
- Flink端到端的一致性
- 1、检查点机制
- 2、幂等
- 3、事务
- 水位线
- 窗口操作
- 1、窗口类型
- 2、窗口操作的时间语义
- 侧出流
- 容错机制
- 双流Join
- Regular Joins
- Interval Joins
- Temporal Joins
- Lookup Join
- 数据处理流程解析
- 一、ODS 数据采集
- 二、DIM 维度层处理
- 三、DWD 事实表准备
- 四、DWS 汇总表的抽取以及轻度聚合
- 五、ADS 分析和可视化操作
标签:状态编程、端到端的一致性、日志分流、测出流、精准一次性、事务、隔离级别、容错、窗口、水位线、双流Join 、分布式流处理、事件事件支持、支持批处理和流处理统一编程模型
事件时间支持
说明:事件时间是指事件在其产生的源头所发生的时间,这个时间通常会被编码到事件数据中。在流计算中,使用事件时间语义时,计算引擎会根据事件的时间戳来对事件进行排序和处理,而不是按照事件进入计算系统的时间(处理时间)或进入数据源的时间(摄入时间)。
应用场景:在金融交易系统中,处理交易记录时,即使某些交易记录由于网络问题晚到,Flink 也能根据交易发生的实际时间进行统计和分析,保证数据的准确性。
Flink状态编程
在流处理中,状态是指在处理过程中需要记住的信息。由于流数据是持续不断的,很多时候我们需要根据历史数据来处理当前数据,这就需要使用状态来存储历史信息。例如,在计算一段时间内的用户点击量时,需要记录每个用户的点击次数,这个点击次数就是一个状态
一、状态的类型
1. 托管状态(Managed State)
由 Flink 框架管理的状态,Flink 会自动进行状态的存储、恢复和内存管理等操作。托管状态又分为两种:
-
键控状态(Keyed State)
与特定的键(
key
)关联,只能在KeyedStream
上使用。
Flink 会根据键将数据分发到不同的并行实例上,并且每个键的状态是唯一的。这意味着在不同的并行实例中,相同键的状态是共享的,并且可以被正确地更新和访问。
例如,对于不同用户的最后访问日期,Flink 会根据用户 ID 这个键来管理每个用户的状态,确保每个用户的状态是一致的。键控状态可以细分为以下几种类型:
- ValueState:存储单个值。例如,记录每个用户的最新登录时间。
- ListState:存储一个列表。比如,存储每个用户最近的 10 次操作记录。
- ReducingState:存储经过归约操作后的值。例如,计算每个用户的累计消费金额。
- AggregatingState:与
ReducingState
类似,但支持更复杂的聚合操作。 - MapState:存储键值对的映射。例如,存储每个用户的属性信息。
-
算子状态(Operator State):与算子的实例关联,而不是与键关联。算子状态在所有并行实例之间共享,常用于 Source 或 Sink 等算子。例如,在 Kafka 连接器中,使用算子状态来记录每个分区的消费偏移量。
2. 原始状态(Raw State)
由用户自行管理的状态,Flink 只提供存储空间,不负责状态的序列化和反序列化等操作。原始状态通常用于实现自定义的状态管理逻辑,但使用起来相对复杂,一般情况下推荐使用托管状态。
二、状态的管理和容错
Flink 提供了强大的状态管理和容错机制,主要通过以下方式实现:
- 检查点(Checkpointing):定期将状态的快照保存到持久化存储中,当发生故障时,可以从最近的检查点恢复状态。
- 保存点(Savepoints):手动触发的状态快照,用于手动备份和恢复状态,通常用于升级或维护 Flink 应用程序。
- 状态后端(State Backends):负责管理状态的存储和访问,Flink 提供了多种状态后端,如 MemoryStateBackend、FsStateBackend 和 RocksDBStateBackend 等,可以根据不同的需求选择合适的状态后端。
Flink端到端的一致性
端到端的一致性意味着在整个流处理应用程序中,从数据的产生源头(如消息队列)开始,经过 Flink 的处理,到最终将结果写入外部存储系统(如数据库、文件系统),数据的处理结果就像在没有任何故障发生的情况下一样准确。也就是说,无论在处理过程中遇到什么故障(如节点崩溃、网络中断等),系统都能保证最终结果的一致性。
最多一次(At-Most-Once)
- 特点:数据可能会丢失,但不会重复处理。也就是说,每个数据记录最多只会被处理一次。这种级别通常适用于对数据丢失不太敏感的场景,如实时监控数据。
- 实现方式:在这种模式下,Flink 不会进行检查点操作,也不会保证数据的可靠传输和处理。
至少一次(At-Least-Once)
- 特点:数据不会丢失,但可能会重复处理。也就是说,每个数据记录至少会被处理一次。这种级别是 Flink 的默认一致性级别,适用于大多数对数据丢失敏感,但对数据重复处理不太敏感的场景,如日志分析。
- 实现方式:通过启用检查点机制,Flink 可以在发生故障时从最近的检查点恢复状态,继续处理数据,从而保证数据不会丢失。但由于可能会重复处理部分数据,因此会出现数据重复的情况。
精确一次(Exactly-Once)
- 特点:数据只会被处理一次,既不会丢失也不会重复处理。这种级别是最严格的一致性级别,适用于对数据准确性要求极高的场景,如金融交易处理。
- 实现方式:要实现精确一次的一致性,需要同时使用检查点机制和两阶段提交协议。Flink 会在检查点的基础上,通过两阶段提交协议确保数据在写入外部系统时的一致性。
Source端:起码提供可重置偏移量(数据可重放的能力)
Tranform端:检查点机制
Sink端:幂等,事务两阶段提交协议(2PC)
1、检查点机制
Flink 检查点机制的核心是定期对分布式流处理程序的状态进行快照,并将这些快照保存到持久化存储中。当系统出现故障(如节点崩溃、网络中断等)时,Flink 可以利用最近一次成功的检查点将程序的状态恢复到故障发生前的状态,然后从该状态继续处理数据,从而保证数据处理的一致性和容错性。
配置检查点参数
- 检查点间隔:通过
enableCheckpointing(long interval)
方法设置检查点的时间间隔,即每隔多长时间触发一次检查点。 - 检查点模式:通过
setCheckpointingMode(CheckpointingMode mode)
方法设置检查点的模式,支持EXACTLY_ONCE
(精确一次)和AT_LEAST_ONCE
(至少一次)两种模式。 - 检查点超时时间:通过
setCheckpointTimeout(long timeout)
方法设置检查点的超时时间,如果在规定时间内检查点没有完成,则会被视为失败。 - 最大并发检查点数:通过
setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints)
方法设置最大并发检查点数,即允许同时进行的检查点数量。
2、幂等
-
定义:幂等是指一个操作执行多次所产生的结果与执行一次的结果相同。也就是说,无论该操作被重复执行多少次,对系统的影响都只相当于执行了一次。在 Flink 中,可以通过在数据目的地实现幂等写入逻辑,确保即使在故障恢复后重复写入相同的数据,也不会对最终结果产生影响。
-
不同场景下实现幂等的常见方法
数据库操作:唯一索引、乐观锁
接口服务:Token 机制、状态机
3、事务
-
定义:事务是由一系列数据库操作组成的逻辑单元,这些操作要么全部成功执行,要么全部不执行,以保证数据库的一致性。
-
特性(ACID)
原子性(Atomicity):事务中的所有操作要么全部完成,要么全部不完成,不会存在部分操作成功、部分操作失败的情况。
一致性(Consistency):事务执行前后,数据库的完整性约束不会被破坏,数据从一个一致状态转换到另一个一致状态。
隔离性(Isolation):多个事务并发执行时,每个事务都感觉不到其他事务的存在,就好像它们是在独立执行一样。
持久性(Durability):一旦事务提交成功,它对数据库所做的修改就会永久保存下来,即使系统出现故障也不会丢失。
水位线
- 事件时间:指事件实际发生的时间,是事件自带的时间属性。在流处理场景中,事件产生的时间往往早于其被处理的时间,并且可能由于网络延迟等原因导致事件到达顺序与实际发生顺序不一致,即乱序到达。
- 逻辑时钟:是一种在分布式系统中用于确定事件顺序的机制。在流处理框架(如 Flink)中,逻辑时钟与事件时间紧密相关,它帮助系统理解事件的先后顺序。水位线可以看作是逻辑时钟的一种体现形式,它基于事件时间来推进,用于标记当前系统认为已经接收到了某个时间点之前的所有数据。
- 水位线:是事件时间语义中的关键概念。它是一个时间戳,用于表示到某个时间点为止,流中所有小于该时间戳的事件都已经到达。当水位线超过窗口的结束时间时,就可以认为窗口已经收集到了所有应该属于该窗口的数据,从而触发窗口计算。水位线的推进是依据事件时间和乱序情况来确定的,它使得流处理系统在处理乱序数据时仍能保证计算结果的准确性。
水位线的两种方式
单调递增
- 定义:单调递增的水位线是有界乱序的一种特殊情况,其乱序程度为 0。在这种情况下,事件严格按照时间戳顺序到达,不存在乱序的情况。单调递增是:有界乱序的一种特殊情况(乱序程度0)
- 适用场景:适用于数据按照时间顺序有序到达的场景,例如某些实时监控系统,数据从传感器按时间顺序依次发送,不会出现乱序的情况。
有界乱序
- 定义:有界乱序表示事件的乱序程度是有限的,即事件的时间戳虽然可能乱序,但最大的乱序时间是可确定的。在这种情况下,需要根据允许的最大乱序时间来生成水位线。
- 适用场景:适用于大多数实际的流处理场景,因为在网络传输等情况下,数据乱序是常见的,但乱序程度通常是有限的。例如在电商订单处理系统中,订单事件可能由于网络原因出现一定程度的乱序,但不会无限期地延迟到达。
窗口操作
窗口操作是 Apache Flink 流处理框架中非常重要的特性,它允许用户对无界的数据流进行有限的分组和聚合操作。由于流数据是持续不断且无界的,很难对整个数据流进行处理,窗口操作可以将数据流划分为有限大小的 “桶”,在每个桶上进行各种计算
- 窗口(Window):是一种对无限流进行有限分组的抽象概念,将流数据按照一定的规则划分成多个有限大小的集合,每个集合就是一个窗口。
- 窗口分配器(Window Assigner):负责将流中的元素分配到一个或多个窗口中。常见的窗口分配器有时间窗口分配器和计数窗口分配器等。
- 窗口函数(Window Function):在窗口分配好元素后,窗口函数用于对每个窗口中的元素进行计算,例如求和、计数、求平均值等。
1、窗口类型
- 滚动窗口(TUMBLE) #
- 滑动窗口(HOP) #
- 累积窗口(CUMULATE) #
- 会话窗口(SESSION) #
2、窗口操作的时间语义
- 处理时间(Processing Time):基于系统时钟来定义窗口,即元素进入 Flink 系统的时间。处理时间不考虑事件的实际发生时间,实现简单,但在处理乱序事件时可能会导致结果不准确。
- 事件时间(Event Time):基于事件本身携带的时间戳来定义窗口,能够处理乱序事件,保证结果的准确性。在使用事件时间时,需要为流中的元素分配时间戳,并使用水位线(Watermark)来处理乱序事件。
- 摄入时间(Ingestion Time):是事件进入 Flink 数据源的时间,介于处理时间和事件时间之间。摄入时间由系统自动分配时间戳,不需要用户手动指定,但处理乱序事件的能力不如事件时间。
侧出流
侧输出流允许你在处理流数据时,将一部分特定的数据从主流中分离出来,发送到一个或多个独立的输出流中。这些侧输出流与主流相互独立,你可以对它们进行不同的处理,例如将异常数据、迟到数据等分离出来进行单独处理。
// 定义侧出流标签
OutputTag<String> dirtyTag = new OutputTag<String>("dirtyTag"){};、
// 如果转换出异常 将脏数据放到输出流
ctx.output(dirtyTag, value);
使用场景
- 处理迟到数据:在使用事件时间语义时,可能会有部分数据迟到。可以将这些迟到的数据通过侧输出流分离出来,进行单独的处理或分析。
- 异常数据处理:在数据处理过程中,可能会遇到一些不符合预期的数据,如格式错误、超出范围的值等。将这些异常数据发送到侧输出流,避免影响主流数据的处理。
- 多维度数据分析:将不同类型的数据分别发送到不同的侧输出流,以便进行不同维度的数据分析。
容错机制
- 特性说明:Flink 的容错机制基于检查点和状态后端。检查点会定期对应用程序的状态进行快照,并将其保存到持久化存储中。当发生故障时,Flink 可以从最近的检查点恢复状态,继续处理数据。状态后端负责管理状态的存储和访问,Flink 提供了多种状态后端,如
MemoryStateBackend
、FsStateBackend
和RocksDBStateBackend
等,用户可以根据不同的需求选择合适的状态后端。 - 应用场景:在大规模数据处理场景中,节点故障是常见的问题。Flink 的容错机制可以确保在节点故障时,数据处理不会中断,保证系统的稳定性和可靠性。
KafkaSink: 向Kafka主题中写入数据,也可以保证写入的精准一次性,需要做如下操作
Flink-1.20官方说明:Kafka | Apache Flink
KafkaSink
总共支持三种不同的语义保证(DeliveryGuarantee
)。对于DeliveryGuarantee.AT_LEAST_ONCE
和DeliveryGuarantee.EXACTLY_ONCE
,Flink checkpoint 必须启用。默认情况下KafkaSink
使用DeliveryGuarantee.NONE
。 以下是对不同语义保证的解释:
DeliveryGuarantee.NONE
不提供任何保证:消息有可能会因 Kafka broker 的原因发生丢失或因 Flink 的故障发生重复。DeliveryGuarantee.AT_LEAST_ONCE
: sink 在 checkpoint 时会等待 Kafka 缓冲区中的数据全部被 Kafka producer 确认。消息不会因 Kafka broker 端发生的事件而丢失,但可能会在 Flink 重启时重复,因为 Flink 会重新处理旧数据。DeliveryGuarantee.EXACTLY_ONCE
: 该模式下,Kafka sink 会将所有数据通过在 checkpoint 时提交的事务写入。因此,如果 consumer 只读取已提交的数据(参见 Kafka consumer 配置isolation.level
),在 Flink 发生重启时不会发生数据重复。然而这会使数据在 checkpoint 完成时才会可见,因此请按需调整 checkpoint 的间隔。请确认事务 ID 的前缀(transactionIdPrefix)对不同的应用是唯一的,以保证不同作业的事务 不会互相影响!此外,强烈建议将 Kafka 的事务超时时间调整至远大于 checkpoint 最大间隔 + 最大重启时间,否则 Kafka 对未提交事务的过期处理会导致数据丢失。
-
开启检查点
-
配置生产者消费一次性
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
-
设置事务ID前缀保证对不同的应用是唯一的
.setTransactionalIdPrefix("dwd_base_log_")
-
设置事务超时时间 (检查点超时时间 < 事务的超时时间 <= 事务的最大超时时间)
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 15*60*1000 + "")
-
在消费端,需要设置消费的隔离级别为读已提交(默认为读为提交:预提交的数据也能读到)
.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
双流Join
Flink官网:Join
Flink SQL支持对动态表进行复杂而灵活的连接操作。 为了处理不同的场景,需要多种查询语义,因此有几种不同类型的 Join。
默认情况下,joins 的顺序是没有优化的。表的 join 顺序是在
FROM
从句指定的。可以通过把更新频率最低的表放在第一个、频率最高的放在最后这种方式来微调 join 查询的性能。需要确保表的顺序不会产生笛卡尔积,因为不支持这样的操作并且会导致查询失败。
Regular Joins
Regular join 是最通用的 join 类型。在这种 join 下,join 两侧表的任何新记录或变更都是可见的,并会影响整个 join 的结果。 例如:如果左边有一条新纪录,在 Product.id
相等的情况下,它将和右边表的之前和之后的所有记录进行 join。
Interval Joins
返回一个符合 join 条件和时间限制的简单笛卡尔积。Interval join 需要至少一个 equi-join 条件和一个 join 两边都包含的时间限定 join 条件。范围判断可以定义成就像一个条件(<, <=, >=, >),也可以是一个 BETWEEN 条件,或者两边表的一个相同类型(即:处理时间 或 事件时间)的时间属性 的等式判断。
Temporal Joins
时态表(Temporal table)是一个随时间变化的表:在 Flink 中被称为动态表。时态表中的行与一个或多个时间段相关联,所有 Flink 中的表都是时态的(Temporal)。 时态表包含一个或多个版本的表快照,它可以是一个变化的历史表,跟踪变化(例如,数据库变化日志,包含所有快照)或一个变化的维度表,也可以是一个将变更物化的维表(例如,存放最终快照的数据表)。
Lookup Join
lookup join 通常用于使用从外部系统查询的数据来丰富表。join 要求一个表具有处理时间属性,另一个表由查找源连接器(lookup source connnector)支持。
lookup join 和上面的 处理时间 Temporal Join 语法相同,右表使用查找源连接器支持。
数据处理流程解析
重要是看业务需求,以下仅供参考
一、ODS 数据采集
日志数据采集
- Flume 组件:Flume 是一个分布式、可靠且高可用的海量日志采集、聚合和传输的系统,它支持自定义 Source、Channel 和 Sink。不过,需要注意的是,Flume 已于 2024 - 10 - 10 停止维护 。除了 Flume,也可以使用开源日志收集器 Fluentd ,Fluentd 具有轻量级、高性能、可扩展性强等优点,能方便地将不同来源的日志数据收集并发送到指定的目的地。
业务数据采集
- Maxwell:Maxwell 是一个用于从 MySQL 数据库的 binlog 中读取数据的工具。它可以实时捕获数据库的变更,并将这些变更以 JSON 格式输出。但 Maxwell 本身不具备将 JSON 格式的数据封装为字符串的功能,在实际使用中,可能需要在后续处理环节进行相应的转换操作。
二、DIM 维度层处理
此层的主要目标是将业务数据库维度表的变化实时同步到数据仓库。具体实现步骤如下:
- 配置判断:在配置表中配置相关信息,以此判断哪些表属于维度表。可以使用 Flink CDC(Change Data Capture)来捕获业务数据库中维度表的变更数据。Flink CDC 能够实时监测数据库的变化,并将变更数据以流的形式输出。
- 广播流的使用:将配置信息以广播流的形式发送到各个算子。广播流可以确保每个并行实例都能获取到相同的配置信息。
- 主流数据处理:主流数据在处理过程中,根据表名从广播流中获取对应的配置信息,进而判断该表是否为维度表,并进行相应的处理。
三、DWD 事实表准备
流量域
- 日志分流:对采集到的日志数据进行分流处理,根据不同的业务需求将日志数据划分到不同的流中,例如按照页面浏览、点击事件等进行分类。
交易域
- Flink SQL 实现连接:在使用 Flink SQL 进行普通内外连接时,参与连接的表默认会维护一个状态,并且在默认情况下这个状态永远不会失效。需要根据传输延迟和业务滞后的关系,合理设置状态的保留时间。
- 左外连接的回撤流问题:当进行左外连接时,如果左表数据先到,右表数据后到,可能会产生回撤流。例如,会产生 3 条数据(包含原始插入、回撤、重新插入)。当将这些数据发送到 Kafka 主题时,会记录这三条数据,因此需要进行空数据处理和去重操作,以保证数据的准确性。
- 连接方式的选择:普通内外连接在关联维度时,时间控制不够灵活。而 Lookup Join 是左表驱动的连接方式,能更好地控制连接的时间和数据量。
工具域、互动域、用户域
这些域的事实表准备可以通过 DataStream API 或 SQL 来实现。DataStream API 提供了更细粒度的控制,适合处理复杂的业务逻辑;而 SQL 则更简洁,适合快速实现简单的业务需求。
四、DWS 汇总表的抽取以及轻度聚合
此层的主要目的是为 ADS 层提供服务,具体步骤如下:
- 指标梳理:列出需要统计的指标,将统计周期、粒度、业务过程相同的指标放在一张汇总表进行处理,这就是汇总表的抽取过程。
- 数据处理与聚合:使用 Flink SQL 或 Flink API 读取数据并创建动态表,指定 waterMark 以处理乱序数据,提取事件时间字段。然后进行数据处理、分组、开窗、聚合计算等操作,最后将结果写入 Doris 数据库。
- 数据一致性保证:在往数据库写数据时,需要保证数据的一致性。不同的数据库可以采用不同的策略:
- HBase:可以利用 HBase 的特性实现幂等写入,确保相同的数据多次写入不会产生重复记录。
- Kafka:使用事务机制来保证数据的一致性,确保在出现故障时数据不会丢失或重复。
- Doris:Doris 的 union 表可以实现幂等写入,避免数据重复。
- 去重处理
- 状态 + 定时器:使用状态来记录已经处理过的数据,结合定时器来控制状态的失效时间。这种方法失效性较差,但不会出现数据膨胀的问题。
- 状态 + 抵消:通过状态记录数据,并在合适的时候进行抵消操作,以达到去重的目的。
- 维度关联
- 基本维度关联:可以使用旁路缓存 Redis 来存储维度数据,提高查询效率。同时,支持异步 IO操作数据库,通过异步编排对象来实现异步查询,减少等待时间,提高系统的吞吐量。
五、ADS 分析和可视化操作
ADS 层主要用于数据分析和可视化展示。可以使用 Sugar 等工具来实现数据分析和可视化操作。Sugar 可以帮助用户快速构建数据可视化界面,通过图表、报表等形式直观地展示数据分析结果,为业务决策提供支持。