第8章
分流
1.使用侧输出流
2.合流
2.1 union :使用 ProcessFunction 处理合流后的数据
2.2 Connect :
两条流的格式可以不一样, map操作使用CoMapFunction,process 传入:CoProcessFunction
2.2 BroadcastConnectedStream
keyBy 进行了按键分区,那么要传入的就是 KeyedBroadcastProcessFunction;
如果没有按键分区,就传入 BroadcastProcessFunction
3.基于时间的合流——双流联结(Join)3.1 窗口联结(Window Join)stream1.join(stream2).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<JoinFunction>)3.2 间隔联结(Interval Join)所以匹配的条件为:a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBoundprocess函数传入:ProcessJoinFunction3.3 窗口同组联结(Window CoGroup)stream1.coGroup(stream2).where(<KeySelector>).equalTo(<KeySelector>).window(TumblingEventTimeWindows.of(Time.hours(1))).apply(<CoGroupFunction>)
第九章:状态编程
1 状态的分类:托管状态(Managed State)和原始状态(Raw State)1.托管状态分为两类:算子状态(Operator State)和按键分区状态(Keyed State)1.1算子状态可以用在所有算子上,使用的时候其实就跟一个本地变量没什么区别——因为本地变量的作用域也是当前任务实例。在使用时,我们还需进一步实现 CheckpointedFunction 接口。ListState、UnionListState 和 BroadcastState1.2 按键分区状态(Keyed State):状态是根据输入流中定义的键(key)来维护和访问的,所以只能定义在按键分区流(KeyedStream)中,也就 keyBy 之后才可以使用支持的数据结构:值状态(ValueState)、列表状态(ListState)、映射状态(MapState)、归约状态(ReducingState)、聚合状态(AggregatingState)open中声明状态:getRuntimeContext.getMapState(new MapStateDescriptor[String,String]("my-map-state",classOf[String],classOf[String]))2.状态生存时间(TTL)StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(10))//这就是设定的状态生存时间.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//创建状态和更改状态(写操作)时更新失效时间.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)//表示从不返回过期值.build();ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("mystate", String.class);stateDescriptor.enableTimeToLive(ttlConfig);3.状态持久化和状态后端1. 状态后端的分类:“哈希表状态后端”(HashMapStateBackend)、内嵌 RocksDB 状态后端”(EmbeddedRocksDBStateBackend)。
第十章:检查点(Checkpoint)
1.从检查点来恢复状态了。具体的步骤为:(1)重启应用,所有任务的状态会清空(2)读取检查点,重置状态。找到最近一次保存的检查点,从中读出每个算子任务状态的快照,分别填充到对应的状态中。(3)重放数据:保存检查点后开始重新读取数据,这可以通过 Source 任务向外部数据源重新提交偏移量(offset)来实现(4)继续处理数据2.检查点算法:Flink 使用了 Chandy-Lamport 算法的一种变体,被称为“异步分界线快照”(asynchronous barrier snapshotting)算法。算法的核心就是两个原则:当上游任务向多个并行
下游任务发送 barrier 时,需要广播出去;而当多个上游任务向同一个下游任务传递 barrier 时,
需要在下游任务执行“分界线对齐”(barrier alignment)操作,也就是需要等到所有并行分区
的 barrier 都到齐,才可以开始状态的保存。具体过程如下:(1)JobManager 发送指令,触发检查点的保存;Source 任务保存状态,插入分界线(2)状态快照保存完成,分界线向下游传递(3)向下游多个并行子任务广播分界线,执行分界线对齐(4)分界线对齐后,保存状态到持久化存储(5)先处理缓存数据,然后正常继续处理3 端到端精确一次(end-to-end exactly-once)
3.1 输入端保证外部数据源就必须拥有重放数据的能力3.2输出端保证幂等写入事务写入:预写日志(WAL)和两阶段提交(2PC)(1)预写日志(write-ahead-log,WAL):缺点:再次确认可能会导致数据写出成功,但是确认消息失败,导致的数据重复写入①先把结果数据作为日志(log)状态保存起来②进行检查点保存时,也会将这些结果数据一并做持久化存储③在收到检查点完成的通知时,将所有结果一次性写入外部系统。
(2)两阶段提交(two-phase-commit,2PC)
具体的实现步骤为:①当第一条数据到来时,或者收到检查点的分界线时,Sink 任务都会启动一个事务。②接下来接收到的所有数据,都通过这个事务写入外部系统;这时由于事务没有提交,所以数据尽管写入了外部系统,但是不可用,是“预提交”的状态。③当 Sink 任务收到 JobManager 发来检查点完成的通知时,正式提交事务,写入的结果就真正可用了。当中间发生故障时,当前未提交的事务就会回滚,于是所有写入外部系统的数据也就实现了撤回2PC 对外部系统的要求外部系统必须提供事务支持,或者 Sink 任务必须能够模拟外部系统上的事务。⚫ 在检查点的间隔期间里,必须能够开启一个事务并接受数据写入。⚫ 在收到检查点完成的通知之前,事务必须是“等待提交”的状态。在故障恢复的情况下,这可能需要一些时间。如果这个时候外部系统关闭事务(例如超时了),那么未提交的数据就会丢失。⚫ Sink 任务必须能够在进程失败后恢复事务。⚫ 提交事务必须是幂等操作。也就是说,事务的重复提交应该是无效的。(3) kafka-flink-kafka 实现端到端 exactly-once 的具体过程可以分解如下 1.启动检查点保存:标志着我们进入了两阶段提交协议的“预提交”阶段2.算子任务对状态做快照保存到状态后端3.Sink 任务开启事务,进行预提交4.检查点保存完成,提交事务当所有算子的快照都完成,JobManager 会向所有任务发确认通知,告诉大家当前检查点已成功保存,当 Sink 任务收到确认通知后,就会正式提交之前的事务需要的配置:必须启用检查点、 FlinkKafkaProducer 的构造函数中传入参数 Semantic.EXACTLY_ONCE、Kafka 读取数据的消费者的隔离级别(read_committed)、事务超时配置