53、Flink 的Broadcast State 模式介绍及示例

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录

  • Flink 系列文章
  • 一、Broadcast State 模式
    • 1、提供的 API
    • 2、BroadcastProcessFunction 和 KeyedBroadcastProcessFunction
    • 3、重要注意事项
  • 二、示例:按照分组规则进行图形匹配
    • 1、maven依赖
    • 2、实现
    • 3、验证
      • 1)、规则输入
      • 2)、item输入
      • 3)、控制台输出
  • 三、示例:BroadcastProcessFunction将维表数据广播给其他流
    • 1、maven依赖
    • 2、实现
      • 1)、BroadcastProcessFunction实现
      • 2)、连接实现
    • 3、验证
      • 1)、输入user数据
      • 2)、输入事实流订单数据
      • 3)、观察程序控制台输出


本文详细的介绍了broadcast state的具体使用,并以两个例子分别介绍了BroadcastProcessFunction和KeyedBroadcastProcessFunction的具体实现。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

一、Broadcast State 模式

1、提供的 API

在这里我们使用一个例子来展现 broadcast state 提供的接口。

假设存在一个序列,序列中的元素是具有不同颜色与形状的图形,我们希望在序列里相同颜色的图形中寻找满足一定顺序模式的图形对(比如在红色的图形里,有一个长方形跟着一个三角形)。 同时,我们希望寻找的模式也会随着时间而改变。

在这个例子中,我们定义两个流,一个流包含图形(Item),具有颜色和形状两个属性。另一个流包含特定的规则(Rule),代表希望寻找的模式。

在图形流中,我们需要首先使用颜色将流进行进行分区(keyBy),这能确保相同颜色的图形会流转到相同的物理机上。

// 将图形使用颜色进行划分
KeyedStream<Item, Color> colorPartitionedStream = itemStream.keyBy(new KeySelector<Item, Color>(){...});

对于规则流,它应该被广播到所有的下游 task 中,下游 task 应当存储这些规则并根据它寻找满足规则的图形对。下面这段代码会完成:
i) 将规则广播给所有下游 task;
ii) 使用 MapStateDescriptor 来描述并创建 broadcast state 在下游的存储结构

// 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>("RulesBroadcastState",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint<Rule>() {})
);// 广播流,广播规则并且创建 broadcast state
BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);

最终,为了使用规则来筛选图形序列,我们需要:

  • 将两个流关联起来
  • 完成我们的模式识别逻辑

为了关联一个非广播流(keyed 或者 non-keyed)与一个广播流(BroadcastStream),我们可以调用非广播流的方法 connect(),并将 BroadcastStream 当做参数传入。 这个方法的返回参数是 BroadcastConnectedStream,具有类型方法 process(),传入一个特殊的 CoProcessFunction 来书写我们的模式识别逻辑。 具体传入 process() 的是哪个类型取决于非广播流的类型:

  • 如果流是一个 keyed 流,那就是 KeyedBroadcastProcessFunction 类型;
  • 如果流是一个 non-keyed 流,那就是 BroadcastProcessFunction 类型。

在我们的例子中,图形流是一个 keyed stream,所以我们书写的代码如下:

connect() 方法需要由非广播流来进行调用,BroadcastStream 作为参数传入。

DataStream<String> output = colorPartitionedStream.connect(ruleBroadcastStream).process(// KeyedBroadcastProcessFunction 中的类型参数表示://   1. key stream 中的 key 类型//   2. 非广播流中的元素类型//   3. 广播流中的元素类型//   4. 结果的类型,在这里是 stringnew KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {// 模式匹配逻辑});

2、BroadcastProcessFunction 和 KeyedBroadcastProcessFunction

在传入的 BroadcastProcessFunction 或 KeyedBroadcastProcessFunction 中,我们需要实现两个方法。processBroadcastElement() 方法负责处理广播流中的元素,processElement() 负责处理非广播流中的元素。

两个子类型定义如下:

  • BroadcastProcessFunction
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}
  • KeyedBroadcastProcessFunction
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}

需要注意的是 processBroadcastElement() 负责处理广播流的元素,而 processElement() 负责处理另一个流的元素。
两个方法的第二个参数(Context)不同,均有以下方法:

  • 得到广播流的存储状态:ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)
  • 查询元素的时间戳:ctx.timestamp()
  • 查询目前的Watermark:ctx.currentWatermark()
  • 目前的处理时间(processing time):ctx.currentProcessingTime()
  • 产生旁路输出:ctx.output(OutputTag outputTag, X value)

在 getBroadcastState() 方法中传入的 stateDescriptor 应该与调用 .broadcast(ruleStateDescriptor) 的参数相同。

这两个方法的区别在于对 broadcast state 的访问权限不同。在处理广播流元素这端,是具有读写权限的,而对于处理非广播流元素这端是只读的。 这样做的原因是,Flink 中是不存在跨 task 通讯的。所以为了保证 broadcast state 在所有的并发实例中是一致的,我们在处理广播流元素的时候给予写权限,在所有的 task 中均可以看到这些元素,并且要求对这些元素处理是一致的, 那么最终所有 task 得到的 broadcast state 是一致的。

processBroadcastElement() 的实现必须在所有的并发实例中具有确定性的结果。

同时,KeyedBroadcastProcessFunction 在 Keyed Stream 上工作,所以它提供了一些 BroadcastProcessFunction 没有的功能:
1、processElement() 的参数 ReadOnlyContext 提供了方法能够访问 Flink 的定时器服务,可以注册事件定时器(event-time timer)或者处理时间的定时器(processing-time timer)。当定时器触发时,会调用 onTimer() 方法, 提供了 OnTimerContext,它具有 ReadOnlyContext 的全部功能,并且提供:

  • 查询当前触发的是一个事件还是处理时间的定时器
  • 查询定时器关联的key

2、processBroadcastElement() 方法中的参数 Context 会提供方法 applyToKeyedState(StateDescriptor<S, VS> stateDescriptor, KeyedStateFunction<KS, S> function)。 这个方法使用一个 KeyedStateFunction 能够对 stateDescriptor 对应的 state 中所有 key 的存储状态进行某些操作。目前 PyFlink 不支持 apply_to_keyed_state。

注册一个定时器只能在 KeyedBroadcastProcessFunction 的 processElement() 方法中进行。 在 processBroadcastElement() 方法中不能注册定时器,因为广播的元素中并没有关联的 key。

回到我们当前的例子中,KeyedBroadcastProcessFunction 应该实现如下:

new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {// 存储部分匹配的结果,即匹配了一个元素,正在等待第二个元素// 我们用一个数组来存储,因为同时可能有很多第一个元素正在等待private final MapStateDescriptor<String, List<Item>> mapStateDesc =new MapStateDescriptor<>("items",BasicTypeInfo.STRING_TYPE_INFO,new ListTypeInfo<>(Item.class));// 与之前的 ruleStateDescriptor 相同private final MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>("RulesBroadcastState",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint<Rule>() {}));@Overridepublic void processBroadcastElement(Rule value,Context ctx,Collector<String> out) throws Exception {ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);}@Overridepublic void processElement(Item value,ReadOnlyContext ctx,Collector<String> out) throws Exception {final MapState<String, List<Item>> state = getRuntimeContext().getMapState(mapStateDesc);final Shape shape = value.getShape();for (Map.Entry<String, Rule> entry :ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {final String ruleName = entry.getKey();final Rule rule = entry.getValue();List<Item> stored = state.get(ruleName);if (stored == null) {stored = new ArrayList<>();}if (shape == rule.second && !stored.isEmpty()) {for (Item i : stored) {out.collect("MATCH: " + i + " - " + value);}stored.clear();}// 不需要额外的 else{} 段来考虑 rule.first == rule.second 的情况if (shape.equals(rule.first)) {stored.add(value);}if (stored.isEmpty()) {state.remove(ruleName);} else {state.put(ruleName, stored);}}}
}

3、重要注意事项

这里有一些 broadcast state 的重要注意事项,在使用它时需要时刻清楚:

  • 没有跨 task 通讯:如上所述,这就是为什么只有在 (Keyed)-BroadcastProcessFunction 中处理广播流元素的方法里可以更改 broadcast state 的内容。 同时,用户需要保证所有 task 对于 broadcast state 的处理方式是一致的,否则会造成不同 task 读取 broadcast state 时内容不一致的情况,最终导致结果不一致。

  • broadcast state 在不同的 task 的事件顺序可能是不同的:虽然广播流中元素的过程能够保证所有的下游 task 全部能够收到,但在不同 task 中元素的到达顺序可能不同。 所以 broadcast state 的更新不能依赖于流中元素到达的顺序。

  • 所有的 task 均会对 broadcast state 进行 checkpoint:虽然所有 task 中的 broadcast state 是一致的,但当 checkpoint 来临时所有 task 均会对 broadcast state 做 checkpoint。 这个设计是为了防止在作业恢复后读文件造成的文件热点。当然这种方式会造成 checkpoint 一定程度的写放大,放大倍数为 p(=并行度)。Flink 会保证在恢复状态/改变并发的时候数据没有重复且没有缺失。 在作业恢复时,如果与之前具有相同或更小的并发度,所有的 task 读取之前已经 checkpoint 过的 state。在增大并发的情况下,task 会读取本身的 state,多出来的并发(p_new - p_old)会使用轮询调度算法读取之前 task 的 state。

  • 不使用 RocksDB state backend: broadcast state 在运行时保存在内存中,需要保证内存充足。这一特性同样适用于所有其他 Operator State。

二、示例:按照分组规则进行图形匹配

本示例是上文中的内容具体实现,即实现:
1、按照相同颜色进行分组,在相同颜色组中按照规则进行匹配。
2、相同颜色的规则1:长方形后是三角形
3、相同颜色的规则2:正方形后是长方形

如匹配上述规则1或规则2则输出匹配成功。

1、maven依赖

<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><!-- <scope>provided</scope> --></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-compress</artifactId><version>1.24.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.2</version><!-- <scope>provided</scope> --></dependency></dependencies>

2、实现

package org.tablesql.join;import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/** @Author: alanchan* * @LastEditors: alanchan* * @Description: 按照相同颜色进行分组,在相同颜色组中按照规则进行匹配。相同颜色的规则1:长方形后是三角形;规则2:正方形后是长方形*/
public class TestJoinDimKeyedBroadcastProcessFunctionDemo {@Data@NoArgsConstructor@AllArgsConstructorstatic class Shape {private String name;private String desc;}@Data@NoArgsConstructor@AllArgsConstructorstatic class Colour {private String name;private Long blue;private Long red;private Long green;}@Data@NoArgsConstructor@AllArgsConstructorstatic class Item {private Shape shape;private Colour color;}@Data@NoArgsConstructor@AllArgsConstructorstatic class Rule {private String name;private Shape first;private Shape second;}public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// item 实时流DataStream<Item> itemStream = env.socketTextStream("192.168.10.42", 9999).map(o -> {// 解析item流// 数据结构:Item[shape(name,desc);color(name,blue,red,green)]String[] lines = o.split(";");String[] shapeString = lines[0].split(",");String[] colorString = lines[1].split(",");Shape shape = new Shape(shapeString[0],shapeString[1]);Colour color = new Colour(colorString[0],Long.valueOf(colorString[1]),Long.valueOf(colorString[2]),Long.valueOf(colorString[3]));return new Item(shape,color);});// rule 实时流DataStream<Rule> ruleStream = env.socketTextStream("192.168.10.42", 8888).map(o -> {// 解析rule流// 数据结构:Rule[name;shape(name,desc);shape(name,desc)]String[] lines = o.split(";");String name = lines[0];String[] firstShapeString = lines[1].split(",");String[] secondShapeString = lines[2].split(",");Shape firstShape = new Shape(firstShapeString[0],firstShapeString[1]);Shape secondShape = new Shape(secondShapeString[0],secondShapeString[1]);return new Rule(name,firstShape,secondShape);}).setParallelism(1);// 将图形使用颜色进行划分KeyedStream<Item, Colour> colorPartitionedStream = itemStream.keyBy(new KeySelector<Item, Colour>() {@Overridepublic Colour getKey(Item value) throws Exception {return value.getColor();// 实现分组}});colorPartitionedStream.print("colorPartitionedStream:---->");// 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>("RulesBroadcastState",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint<Rule>() {}));// 将rule定义为广播流,广播规则并且创建 broadcast stateBroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);// 连接,输出流,connect() 方法需要由非广播流来进行调用,BroadcastStream 作为参数传入。DataStream<String> output = colorPartitionedStream.connect(ruleBroadcastStream).process(// KeyedBroadcastProcessFunction 中的类型参数表示:// 1. key stream 中的 key 类型// 2. 非广播流中的元素类型// 3. 广播流中的元素类型// 4. 结果的类型,在这里是 stringnew KeyedBroadcastProcessFunction<Colour, Item, Rule, String>() {// 存储部分匹配的结果,即匹配了一个元素,正在等待第二个元素// 用一个数组来存储,因为同时可能有很多第一个元素正在等待private final MapStateDescriptor<String, List<Item>> itemMapStateDesc = new MapStateDescriptor<>("items",BasicTypeInfo.STRING_TYPE_INFO,new ListTypeInfo<>(Item.class));// 与之前的 ruleStateDescriptor 相同,用于存储规则名称与规则本身的 map 存储结构private final MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>("RulesBroadcastState",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint<Rule>() {}));// 负责处理广播流的元素        @Overridepublic void processBroadcastElement(Rule ruleValue,KeyedBroadcastProcessFunction<Colour, Item, Rule, String>.Context ctx,Collector<String> out) throws Exception {// 得到广播流的存储状态:ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)// 查询元素的时间戳:ctx.timestamp()// 查询目前的Watermark:ctx.currentWatermark()// 目前的处理时间(processing time):ctx.currentProcessingTime()// 产生旁路输出:ctx.output(OutputTag<X> outputTag, X value)    // 在 getBroadcastState() 方法中传入的 stateDescriptor 应该与调用 .broadcast(ruleStateDescriptor) 的参数相同ctx.getBroadcastState(ruleStateDescriptor).put(ruleValue.getName(), ruleValue);}// 负责处理另一个流的元素@Overridepublic void processElement(Item itemValue,KeyedBroadcastProcessFunction<Colour, Item, Rule, String>.ReadOnlyContext ctx,Collector<String> out) throws Exception {final MapState<String, List<Item>> itemMapState = getRuntimeContext().getMapState(itemMapStateDesc);final Shape shape = itemValue.getShape();System.out.println("shape:"+shape);// 在 getBroadcastState() 方法中传入的 stateDescriptor 应该与调用 .broadcast(ruleStateDescriptor) 的参数相同ReadOnlyBroadcastState<String, Rule> readOnlyBroadcastState = ctx.getBroadcastState(ruleStateDescriptor);Iterable<Entry<String, Rule>> iterableRule = readOnlyBroadcastState.immutableEntries();for (Entry<String, Rule> entry : iterableRule) {final String ruleName = entry.getKey();final Rule rule = entry.getValue();// 初始化List<Item> itemStoredList = itemMapState.get(ruleName);if (itemStoredList == null) {itemStoredList = new ArrayList<>();}// 比较 shape if (shape.getName().equals(rule.second.getName()) && !itemStoredList.isEmpty()) {for (Item item : itemStoredList) {// 符合规则,收集匹配结果out.collect("匹配成功: " + item + " - " + itemValue);}itemStoredList.clear();}// 规则连续性设置if (shape.getName().equals(rule.first.getName())) {itemStoredList.add(itemValue);}// if (itemStoredList.isEmpty()) {itemMapState.remove(ruleName);} else {itemMapState.put(ruleName, itemStoredList);}}}});output.print("output:------->");env.execute();}}

3、验证

在netcat中启动两个端口,分别是8888和9999,8888输入规则,9999输入item,然后关键控制台输出。

1)、规则输入

red;rectangle,is a rectangle;tripe,is a tripe
green;square,is a square;rectangle,is a rectangle

2)、item输入

# 匹配成功
rectangle,is a rectangle;red,100,100,100
tripe,is a tripe;red,100,100,100# 匹配成功
square,is square;green,150,150,150
rectangle,is a rectangle;green,150,150,150# 匹配不成功
tripe,is tripe;blue,200,200,200# 匹配成功
rectangle,is a rectangle;blue,100,100,100
tripe,is a tripe;blue,100,100,100# 匹配不成功
tripe,is a tripe;blue,100,100,100
rectangle,is a rectangle;blue,100,100,100

3)、控制台输出

colorPartitionedStream:---->:9> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=red, blue=100, red=100, green=100))shape:TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle)
colorPartitionedStream:---->:9> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=red, blue=100, red=100, green=100))shape:TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe)
output:------->:9> 匹配成功: TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=red, blue=100, red=100, green=100)) - TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=red, blue=100, red=100, green=100))
colorPartitionedStream:---->:9> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=green, blue=150, red=150, green=150))
output:------->:9> 匹配成功: TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=square, desc=is square), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=green, blue=150, red=150, green=150)) - TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=green, blue=150, red=150, green=150))
colorPartitionedStream:---->:3> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=200, red=200, green=200))
colorPartitionedStream:---->:1> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100))
colorPartitionedStream:---->:1> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100))
output:------->:1> 匹配成功: TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100)) - TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100))
colorPartitionedStream:---->:1> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100))
colorPartitionedStream:---->:1> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100))

三、示例:BroadcastProcessFunction将维表数据广播给其他流

本示例是将用户信息作为维表通过流进行广播,在事实表订单流中进行连接匹配输出。

1、maven依赖

参照上文中的依赖。

2、实现

实现方式可以使用匿名内部类或内部类实现,本示例为了清楚其中的逻辑关系,特意以一个具体class来实现。

1)、BroadcastProcessFunction实现

/** @Author: alanchan* @LastEditors: alanchan* @Description: */
package org.tablesql.join;import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.tablesql.join.TestJoinDimFromBroadcastDataStreamDemo.Order;
import org.tablesql.join.TestJoinDimFromBroadcastDataStreamDemo.User;// final BroadcastProcessFunction<IN1, IN2, OUT> function)
public class JoinBroadcastProcessFunctionImpl extends BroadcastProcessFunction<Order, User, Tuple2<Order, String>> {// 用于存储规则名称与规则本身的 map 存储结构 MapStateDescriptor<Integer, User> broadcastDesc;JoinBroadcastProcessFunctionImpl(MapStateDescriptor<Integer, User> broadcastDesc) {this.broadcastDesc = broadcastDesc;}// 负责处理广播流的元素@Overridepublic void processBroadcastElement(User value,BroadcastProcessFunction<Order, User, Tuple2<Order, String>>.Context ctx,Collector<Tuple2<Order, String>> out) throws Exception {System.out.println("收到广播数据:" + value);// 得到广播流的存储状态ctx.getBroadcastState(broadcastDesc).put(value.getId(), value);}// 处理非广播流,关联维度@Overridepublic void processElement(Order value,BroadcastProcessFunction<Order, User, Tuple2<Order, String>>.ReadOnlyContext ctx,Collector<Tuple2<Order, String>> out) throws Exception {// 得到广播流的存储状态ReadOnlyBroadcastState<Integer, User> state = ctx.getBroadcastState(broadcastDesc);out.collect(new Tuple2<>(value, state.get(value.getUId()).getName()));}
}

2)、连接实现

/** @Author: alanchan* @LastEditors: alanchan* @Description: */
package org.tablesql.join;import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;public class TestJoinDimFromBroadcastDataStreamDemo {// 维表@Data@NoArgsConstructor@AllArgsConstructorstatic class User {private Integer id;private String name;private Double balance;private Integer age;private String email;}// 事实表@Data@NoArgsConstructor@AllArgsConstructorstatic class Order {private Integer id;private Integer uId;private Double total;}public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// order 实时流DataStream<Order> orderDs = env.socketTextStream("192.168.10.42", 9999).map(o -> {String[] lines = o.split(",");return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]));});// user 实时流DataStream<User> userDs = env.socketTextStream("192.168.10.42", 8888).map(o -> {String[] lines = o.split(",");return new User(Integer.valueOf(lines[0]), lines[1], Double.valueOf(lines[2]), Integer.valueOf(lines[3]), lines[4]);}).setParallelism(1);// 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构// MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(//         "RulesBroadcastState",//         BasicTypeInfo.STRING_TYPE_INFO,//         TypeInformation.of(new TypeHint<Rule>() {//         }));// 广播流,广播规则并且创建 broadcast state// BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);// 将user流(维表)定义为广播流final MapStateDescriptor<Integer, User> broadcastDesc = new MapStateDescriptor("Alan_RulesBroadcastState",Integer.class,User.class);BroadcastStream<User> broadcastStream = userDs.broadcast(broadcastDesc);// 需要由非广播流来进行调用DataStream result = orderDs.connect(broadcastStream).process(new JoinBroadcastProcessFunctionImpl(broadcastDesc));result.print();env.execute();}// final BroadcastProcessFunction<IN1, IN2, OUT> function)
//     static class JoinBroadcastProcessFunctionImpl extends BroadcastProcessFunction<Order, User, Tuple2<Order, String>> {
//         // 用于存储规则名称与规则本身的 map 存储结构 
//         MapStateDescriptor<Integer, User> broadcastDesc;//         JoinBroadcastProcessFunctionImpl(MapStateDescriptor<Integer, User> broadcastDesc) {
//             this.broadcastDesc = broadcastDesc;
//         }//         // 负责处理广播流的元素
//         @Override
//         public void processBroadcastElement(User value,
//                 BroadcastProcessFunction<Order, User, Tuple2<Order, String>>.Context ctx,
//                 Collector<Tuple2<Order, String>> out) throws Exception {
//             System.out.println("收到广播数据:" + value);
//             // 得到广播流的存储状态
//             ctx.getBroadcastState(broadcastDesc).put(value.getId(), value);
//         }//         // 处理非广播流,关联维度
//         @Override
//         public void processElement(Order value,
//                 BroadcastProcessFunction<Order, User, Tuple2<Order, String>>.ReadOnlyContext ctx,
//                 Collector<Tuple2<Order, String>> out) throws Exception {
//             // 得到广播流的存储状态
//             ReadOnlyBroadcastState<Integer, User> state = ctx.getBroadcastState(broadcastDesc);//             out.collect(new Tuple2<>(value, state.get(value.getUId()).getName()));
//         }
//     }}

3、验证

本示例使用的是两个socket数据源,通过netcat进行模拟。

1)、输入user数据

“192.168.10.42”, 8888


// user 流数据(维度表),由于未做容错处理,需要先广播维度数据,否则会出现空指针异常
// 1001,alan,18,20,alan.chan.chn@163.com
// 1002,alanchan,19,25,alan.chan.chn@163.com
// 1003,alanchanchn,20,30,alan.chan.chn@163.com
// 1004,alan_chan,27,20,alan.chan.chn@163.com
// 1005,alan_chan_chn,36,10,alan.chan.chn@163.com

2)、输入事实流订单数据

“192.168.10.42”, 9999


// order 流数据
// 16,1002,211
// 17,1004,234
// 18,1005,175

3)、观察程序控制台输出


// 控制台输出
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1001, name=alan, balance=18.0, age=20, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1001, name=alan, balance=18.0, age=20, email=alan.chan.chn@163.com)
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1002, name=alanchan, balance=19.0, age=25, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1002, name=alanchan, balance=19.0, age=25, email=alan.chan.chn@163.com)
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1003, name=alanchanchn, balance=20.0, age=30, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1003, name=alanchanchn, balance=20.0, age=30, email=alan.chan.chn@163.com)
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1004, name=alan_chan, balance=27.0, age=20, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1004, name=alan_chan, balance=27.0, age=20, email=alan.chan.chn@163.com)
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1005, name=alan_chan_chn, balance=36.0, age=10, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1005, name=alan_chan_chn, balance=36.0, age=10, email=alan.chan.chn@163.com)
// 7> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=16, uId=1002, total=211.0),alanchan)
// 8> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=17, uId=1004, total=234.0),alan_chan)
// 9> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=18, uId=1005, total=175.0),alan_chan_chn)

以上,本文详细的介绍了broadcast state的具体使用,并以两个例子分别介绍了BroadcastProcessFunction和KeyedBroadcastProcessFunction的具体实现。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/610734.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

世邦通信 SPON IP网络对讲广播系统exportrecord.php 任意文件读取漏洞

产品介绍 世邦通信SPON IP网络对讲广播系统采用领先的IPAudio™技术,将音频信号以数据包形式在局域网和广域网上进行传送,是一套纯数字传输系统。 漏洞描述 spon IP网络对讲广播系统exportrecord.php存在任意文件读取漏洞&#xff0c;攻击者可通过该漏洞在服务器端读取任意敏…

提升跨境电商独立站用户体验的实用指南!

在竞争激烈的市场中&#xff0c;提供卓越的用户体验是吸引并留住顾客的关键。本文将为你分享一些实用的方法&#xff0c;帮助你提高跨境电商独立站用户体验&#xff0c;打造让用户流连忘返的购物环境。 1. 优化网站导航 确保你的网站导航清晰简单&#xff0c;用户能够轻松找到…

Phoenix基本使用

1、Phoenix简介 1.1 Phoenix定义 Phoenix是HBase的开源SQL皮肤。可以使用标准JDBC API代替HBase客户端API来创建表&#xff0c;插入数据和查询HBase数据。 1.2 Phoenix特点 容易集成&#xff1a;如Spark&#xff0c;Hive&#xff0c;Pig&#xff0c;Flume和Map Reduce。性能…

元宇宙电商带你走进数字商城

在当今这个数字化、互联网高速发展的时代&#xff0c;传统的购物方式已逐渐被新兴的电商模式所替代。而在这股变革的浪潮中&#xff0c;一个全新的概念——元宇宙电商&#xff0c;正逐步走入我们的视野&#xff0c;元宇宙概念的兴起成为了热门话题。元宇宙是一个虚拟的世界&…

遥感影像大气校正一:6S模型使用

6S介绍 1986年&#xff0c;法国里尔科技大学大气光学实验室Tanr等人为了简化大气辐射传输方程&#xff0c;开发了太阳光谱波段卫星信号模拟程序5S&#xff08;SIMULATION OF THE SATELLITE SIGNAL IN THE SOLAR SPECTRUM&#xff09;&#xff0c;用来模拟地气系统中太阳…

Hive之set参数大全-3

D 是否启用本地任务调试模式 hive.debug.localtask 是 Apache Hive 中的一个配置参数&#xff0c;用于控制是否启用本地任务调试模式。在调试模式下&#xff0c;Hive 将尝试在本地模式下运行一些任务&#xff0c;以便更容易调试和分析问题。 具体来说&#xff0c;当 hive.de…

spring cloud之集成sentinel

写在前面 源码 。 本文一起看下spring cloud的sentinel组件的使用。 1&#xff1a;准备 1.1&#xff1a;理论 对于一个系统来说&#xff0c;最重要的就是高可用&#xff0c;那么如何实现高可用呢&#xff1f;你可能会说&#xff0c;集群部署不就可以了&#xff0c;但事实并…

【C#】用CefSharp.ChromiumWebBrowser做winform开发过程中遇到的一些坑

文章目录 使用键盘F5刷新、F12打开控制台浏览器默认下载弹窗禁用GPU配置 使用键盘F5刷新、F12打开控制台 public class CEFKeyBoardHander : IKeyboardHandler {public bool OnKeyEvent(IWebBrowser browserControl, IBrowser browser, KeyType type, int windowsKeyCode, int…

【3D动画】Animcraft 基础运用

骨骼包地址&#xff1a;G:\Animcraft相关资料\motion_2024 入库&#xff1a;FBX文件 Mixamo 下载一个模型。格式&#xff1a;FBX 导入到animcraft软件中。

论文封面表格制作

原文参考&#xff1a;【【论文排版】论文封面完美对齐 强迫症重度患者的经验分享】https://www.bilibili.com/video/BV18f4y1p7hc?vd_source046491acdcff4b39fed20406b36a93e2 视频里up主介绍很详细。我自己也记录一下。 介绍一下如何完成论文封面信息的填写。 创建一个3列…

MATLAB对数据隔位抽取和插值的几种方法

对于串行的数据&#xff0c;有时我们需要转成多路并行的数据进行处理&#xff0c;抽取&#xff1b;或者是需要对数据进行隔点抽取&#xff0c;或对数据进行插值处理。此处以4倍抽取或插值为例&#xff0c;MATLAB代码实现。 文章目录 抽取方法一&#xff1a;downsample函数方法…

vue:如果.vue文件内容过多,我们可以这样拆分

一、标签内容太多&#xff0c;我们可以用组件的方式拆分 二、那如果JS的内容太多呢&#xff1f;因为耦合性太高&#xff0c;拆成组件后父子组件传值不方便&#xff0c;我们可以这样&#xff1a; 子组件: export default {data() {return {};},methods: {a(){alert(1)}} };父组…

算法练习:找出数组大于 n/2 的元素

题目&#xff1a; 给定一个大小为 n 的数组nums &#xff0c;返回其中的多数元素。多数元素是指在数组中出现次数 大于 n/2 的元素。你可以假设数组是非空的&#xff0c;并且给定的数组总是存在多数元素。 示例&#xff1a; 输入&#xff1a;nums [3,2,3] 输出&#xff1a;…

[足式机器人]Part2 Dr. CAN学习笔记 - Ch02动态系统建模与分析

本文仅供学习使用 本文参考&#xff1a; B站&#xff1a;DR_CAN Dr. CAN学习笔记 - Ch02动态系统建模与分析 1. 课程介绍2. 电路系统建模、基尔霍夫定律3. 流体系统建模4. 拉普拉斯变换&#xff08;Laplace&#xff09;传递函数、微分方程4.1 Laplace Transform 拉式变换4.2 收…

Exception sending a multicast message:Network is unreachable故障

出现这个故障就是没有连接到网络&#xff0c;如果虚拟机没有连接到本机&#xff0c;那么就会出现这个情况&#xff0c;当虚拟机连接到本机就会自动消失&#xff0c;同时如果是用电脑直接安装Ubuntu运行也会出现这个情况&#xff0c;应该是要连接到一个路由器里面这个情况才会消…

Python 借助装饰器,实现父对象使用子对象的方法!

文章目录 1. 引言2. 实践2.1 装饰器说明2.2 定义一个父类2.3 定义子类 3. 整体来看4. 参考 1. 引言 这个需求的背景是这样的&#xff0c;我们在写复杂代码时候&#xff0c;可能会用到 Python 的类继承的方法&#xff0c;即子对象继承父对象的一些属性方法。 在这个过程中&…

游戏后端如何实现服务器之间的负载均衡?

网络游戏已成为人们休闲娱乐的重要方式之一。而在游戏开发中&#xff0c;如何实现服务器之间的负载均衡是一个非常关键的问题。负载均衡不仅可以提高服务器的处理能力&#xff0c;还能保证游戏的稳定性和流畅性。本文将探讨游戏后端如何实现服务器之间的负载均衡。 一、负载均…

Eigen 中的传播计算

Eigen: Reductions, visitors and broadcasting

太实用了!关于ControlNet,这篇你一定要看

一篇文章教会你&#xff0c;从入门到使用。 这里是行者AI&#xff0c;我们专注于人工智能在游戏领域的研究和应用&#xff0c;凭借自研算法&#xff0c;推出游戏AI、智能内容审核、数据平台等产品服务。 controlNet控制生成 anypaint 看到这篇文章的你&#xff0c;一定也是AI绘…

SpringBoot整合人大金仓数据库KingBase

1 去KingBase官网下载驱动jar包 2 将解压得到的所有jar包放置在libs目录下&#xff08;没有就新建一个目录&#xff09; 3 在pom文件添加相关依赖 <!--添加KingBase所需要的依赖--> <dependency><groupId>com.kingbase</groupId><artifactId>kin…