【flink番外篇】13、Broadcast State 模式示例-广播维表(2)

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录

  • Flink 系列文章
  • 一、示例:BroadcastProcessFunction将维表数据广播给其他流
    • 1、maven依赖
    • 2、实现
      • 1)、BroadcastProcessFunction实现
      • 2)、连接实现
    • 3、验证
      • 1)、输入user数据
      • 2)、输入事实流订单数据
      • 3)、观察程序控制台输出


本文详细的介绍了通过broadcast state的广播示例展示在维表中的应用,需要使用BroadcastProcessFunction。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

一、示例:BroadcastProcessFunction将维表数据广播给其他流

本示例是将用户信息作为维表通过流进行广播,在事实表订单流中进行连接匹配输出。

1、maven依赖

<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><!-- <scope>provided</scope> --></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-compress</artifactId><version>1.24.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.2</version><!-- <scope>provided</scope> --></dependency></dependencies>

2、实现

实现方式可以使用匿名内部类或内部类实现,本示例为了清楚其中的逻辑关系,特意以一个具体class来实现。

1)、BroadcastProcessFunction实现

/** @Author: alanchan* @LastEditors: alanchan* @Description: */
package org.tablesql.join;import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.tablesql.join.TestJoinDimFromBroadcastDataStreamDemo.Order;
import org.tablesql.join.TestJoinDimFromBroadcastDataStreamDemo.User;// final BroadcastProcessFunction<IN1, IN2, OUT> function)
public class JoinBroadcastProcessFunctionImpl extends BroadcastProcessFunction<Order, User, Tuple2<Order, String>> {// 用于存储规则名称与规则本身的 map 存储结构 MapStateDescriptor<Integer, User> broadcastDesc;JoinBroadcastProcessFunctionImpl(MapStateDescriptor<Integer, User> broadcastDesc) {this.broadcastDesc = broadcastDesc;}// 负责处理广播流的元素@Overridepublic void processBroadcastElement(User value,BroadcastProcessFunction<Order, User, Tuple2<Order, String>>.Context ctx,Collector<Tuple2<Order, String>> out) throws Exception {System.out.println("收到广播数据:" + value);// 得到广播流的存储状态ctx.getBroadcastState(broadcastDesc).put(value.getId(), value);}// 处理非广播流,关联维度@Overridepublic void processElement(Order value,BroadcastProcessFunction<Order, User, Tuple2<Order, String>>.ReadOnlyContext ctx,Collector<Tuple2<Order, String>> out) throws Exception {// 得到广播流的存储状态ReadOnlyBroadcastState<Integer, User> state = ctx.getBroadcastState(broadcastDesc);out.collect(new Tuple2<>(value, state.get(value.getUId()).getName()));}
}

2)、连接实现

/** @Author: alanchan* @LastEditors: alanchan* @Description: */
package org.tablesql.join;import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;public class TestJoinDimFromBroadcastDataStreamDemo {// 维表@Data@NoArgsConstructor@AllArgsConstructorstatic class User {private Integer id;private String name;private Double balance;private Integer age;private String email;}// 事实表@Data@NoArgsConstructor@AllArgsConstructorstatic class Order {private Integer id;private Integer uId;private Double total;}public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// order 实时流DataStream<Order> orderDs = env.socketTextStream("192.168.10.42", 9999).map(o -> {String[] lines = o.split(",");return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]));});// user 实时流DataStream<User> userDs = env.socketTextStream("192.168.10.42", 8888).map(o -> {String[] lines = o.split(",");return new User(Integer.valueOf(lines[0]), lines[1], Double.valueOf(lines[2]), Integer.valueOf(lines[3]), lines[4]);}).setParallelism(1);// 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构// MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(//         "RulesBroadcastState",//         BasicTypeInfo.STRING_TYPE_INFO,//         TypeInformation.of(new TypeHint<Rule>() {//         }));// 广播流,广播规则并且创建 broadcast state// BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);// 将user流(维表)定义为广播流final MapStateDescriptor<Integer, User> broadcastDesc = new MapStateDescriptor("Alan_RulesBroadcastState",Integer.class,User.class);BroadcastStream<User> broadcastStream = userDs.broadcast(broadcastDesc);// 需要由非广播流来进行调用DataStream result = orderDs.connect(broadcastStream).process(new JoinBroadcastProcessFunctionImpl(broadcastDesc));result.print();env.execute();}// final BroadcastProcessFunction<IN1, IN2, OUT> function)
//     static class JoinBroadcastProcessFunctionImpl extends BroadcastProcessFunction<Order, User, Tuple2<Order, String>> {
//         // 用于存储规则名称与规则本身的 map 存储结构 
//         MapStateDescriptor<Integer, User> broadcastDesc;//         JoinBroadcastProcessFunctionImpl(MapStateDescriptor<Integer, User> broadcastDesc) {
//             this.broadcastDesc = broadcastDesc;
//         }//         // 负责处理广播流的元素
//         @Override
//         public void processBroadcastElement(User value,
//                 BroadcastProcessFunction<Order, User, Tuple2<Order, String>>.Context ctx,
//                 Collector<Tuple2<Order, String>> out) throws Exception {
//             System.out.println("收到广播数据:" + value);
//             // 得到广播流的存储状态
//             ctx.getBroadcastState(broadcastDesc).put(value.getId(), value);
//         }//         // 处理非广播流,关联维度
//         @Override
//         public void processElement(Order value,
//                 BroadcastProcessFunction<Order, User, Tuple2<Order, String>>.ReadOnlyContext ctx,
//                 Collector<Tuple2<Order, String>> out) throws Exception {
//             // 得到广播流的存储状态
//             ReadOnlyBroadcastState<Integer, User> state = ctx.getBroadcastState(broadcastDesc);//             out.collect(new Tuple2<>(value, state.get(value.getUId()).getName()));
//         }
//     }}

3、验证

本示例使用的是两个socket数据源,通过netcat进行模拟。

1)、输入user数据

“192.168.10.42”, 8888


// user 流数据(维度表),由于未做容错处理,需要先广播维度数据,否则会出现空指针异常
// 1001,alan,18,20,alan.chan.chn@163.com
// 1002,alanchan,19,25,alan.chan.chn@163.com
// 1003,alanchanchn,20,30,alan.chan.chn@163.com
// 1004,alan_chan,27,20,alan.chan.chn@163.com
// 1005,alan_chan_chn,36,10,alan.chan.chn@163.com

2)、输入事实流订单数据

“192.168.10.42”, 9999


// order 流数据
// 16,1002,211
// 17,1004,234
// 18,1005,175

3)、观察程序控制台输出


// 控制台输出
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1001, name=alan, balance=18.0, age=20, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1001, name=alan, balance=18.0, age=20, email=alan.chan.chn@163.com)
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1002, name=alanchan, balance=19.0, age=25, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1002, name=alanchan, balance=19.0, age=25, email=alan.chan.chn@163.com)
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1003, name=alanchanchn, balance=20.0, age=30, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1003, name=alanchanchn, balance=20.0, age=30, email=alan.chan.chn@163.com)
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1004, name=alan_chan, balance=27.0, age=20, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1004, name=alan_chan, balance=27.0, age=20, email=alan.chan.chn@163.com)
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1005, name=alan_chan_chn, balance=36.0, age=10, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1005, name=alan_chan_chn, balance=36.0, age=10, email=alan.chan.chn@163.com)
// 7> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=16, uId=1002, total=211.0),alanchan)
// 8> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=17, uId=1004, total=234.0),alan_chan)
// 9> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=18, uId=1005, total=175.0),alan_chan_chn)

以上,本文详细的介绍了通过broadcast state的广播示例展示在维表中的应用,需要使用BroadcastProcessFunction。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/616443.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

自定义Flink SourceFunction定时读取数据库

文章目录 前言一、自定义Flink SourceFunction定时读取数据库二、java代码实现总结 前言 Source 是Flink获取数据输入的地方&#xff0c;可以用StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到你的程序。Flink 自带了许多预先实现的 source funct…

openssl3.2 - 官方demo学习 - 索引贴

文章目录 openssl3.2 - 官方demo学习 - 索引贴概述笔记工程的搭建和调试环境BIOBIO - client-arg.cBIO - client-conf.cBIO - saccept.cBIO - sconnect.cBIO - server-arg.cBIO - server-cmod.cBIO - server-conf.cBIO - 总结certsciphercipher - aesccm.cEND openssl3.2 - 官方…

使用Java连接MongoDB (6.0.12) 报错

报错&#xff1a; Exception in thread "main" com.mongodb.MongoCommandException: Command failed with error 352: Unsupported OP_QUERY command: create. 上图中“The client driver may require an upgrade”说明了“客户端驱动需要进行升级”&#xff0c;解…

数据分析-Pandas如何转换产生新列

数据分析-Pandas如何转换产生新列 时间序列数据在数据分析建模中很常见&#xff0c;例如天气预报&#xff0c;空气状态监测&#xff0c;股票交易等金融场景。此处选择巴黎、伦敦欧洲城市空气质量监测 N O 2 NO_2 NO2​数据作为样例。 python数据分析-数据表读写到pandas 经典…

What does `rpm -ivh` do?

rpm -ivh 安装 并 显示安装进度 (–install–verbose–hash) rpm -ivh /media/cdrom/RedHat/RPMS/samba-3.0.10-1.4E.i386.rpm 安装rpm -ivh --relocate //opt/gaim gaim-1.3.0-1.fc4.i386.rpm 指定安装到 /opt/gaim[Ref] rpm -uvh和-ivh有什么区别以及zabbix 安…

android前台服务:

android前台服务&#xff1a; android-安卓如何开启前台服务&#xff1f;foregroundService的使用方法&#xff0c;什么是前台服务&#xff1f;_foregroundservicetype-CSDN博客

使用BeanShell写入内容到文件【JMeter】

一、前言 ​ 在我们日常工作中&#xff0c;可能会遇到需要将请求返回的数据写入到文件中。在我们使用JMeter进行性能测试时&#xff0c;就经常能够遇到这种情况。要想达到这种目的&#xff0c;我们一般采取BeanShell后置处理器来将内容写入到文件。 二、提取 ​ 在目前大多数的…

基于多智能体点对点转换的分布式模型预测控制

matlab2020正常运行 基于多智能体点对点转换的分布式模型预测控制资源-CSDN文库

Spring MVC 日期转换器

日期转换器 自定义日期转换器 public class DataConvert implements Converter<String, Date> {/**** 配置时间转换类* param date* return*/Overridepublic Date convert(String date) {try {SimpleDateFormat sdf new SimpleDateFormat("yyyy-MM-dd");ret…

对于软件测试的认识和了解

对软件测试的认识&#xff1a; 软件测试要求开发人员避免测试自己开发的程序。从心理学角度讲&#xff0c;这是很有道理的。特别是一个相对复杂的系统&#xff0c;开发人员在刚刚开发完成的时候&#xff0c;尚沉浸于对自己设计的回味之中。此时去测试的话往往会侧重于程序本身的…

CSS3简单运用过渡元素(transition)

CSS3过渡 概念&#xff1a;在CSS3中&#xff0c;我们可以使用transition属性将元素的某一个属性从“一个属性值”在指定的时间内平滑地过渡到“另一个属性值”&#xff0c;从而实现动画效果。 CSS3变形&#xff08;transform)呈现的仅仅是一个结果&#xff0c;而CSS过渡&…

WPS - 表格虚线变成实线解决方案(Office 同上)

1、选中表格区域&#xff0c;在表格中选中需要调整为实线的表格区域 2、点击设置单元格格式&#xff0c;鼠标进行右击并点击设置单元格格式选项 3、选择实线&#xff0c;在单元格格式下的边框&#xff0c;调整到实线 4、设置为实线&#xff0c;即可将表格的虚线设置为实线

AI系统ChatGPT网站系统源码AI绘画详细搭建部署教程,支持GPT语音对话+DALL-E3文生图+GPT-4多模态模型识图理解

一、前言 SparkAi创作系统是基于ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统&#xff0c;支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如何搭建部署AI创作Ch…

【AI视野·今日NLP 自然语言处理论文速览 第七十四期】Wed, 10 Jan 2024

AI视野今日CS.NLP 自然语言处理论文速览 Wed, 10 Jan 2024 Totally 38 papers &#x1f449;上期速览✈更多精彩请移步主页 Daily Computation and Language Papers Model Editing Can Hurt General Abilities of Large Language Models Authors Jia Chen Gu, Hao Xiang Xu, J…

Qt QGraphicsItem获取鼠标位置对应图像坐标

本次使用了QGraphicsView来加载图像&#xff0c;然后给其设置了一个QGraphicsScene场景&#xff0c;再给场景添加了一个自定义的QGraphicsItem&#xff0c;在其中重写了paint事件&#xff0c;用来重绘图像。 正常情况时&#xff0c;QGraphicsItem上图像的有效区域QRect大小和QG…

基于爬虫和Kettle的豆瓣电影的采集与预处理

一&#xff1a;爬虫 1、爬取的目标 将豆瓣电影网上的电影的基本信息&#xff0c;比如&#xff1a;电影名称、导演、电影类型、国家、上映年份、评分、评论人数爬取出来&#xff0c;并将爬取的结果放入csv文件中&#xff0c;方便存储。 2、网站结构 图1豆瓣网网站结构详…

Polars使用指南(二)

在上一篇文章中&#xff0c;我们介绍了Polars的优势和Polars.Series的常用API&#xff0c;本篇文章我们继续介绍Polars.Series的扩展API。 对于一些特殊的数据类型&#xff0c;如 pl.Array、list、str 等&#xff0c;Polars.Series 提供了基于属性的直接操作API&#xff0c;如…

Web前端 ---- 【Vue3】Proxy响应式原理

目录 前言 安装Vue3项目 安装 Proxy 语法格式 前言 从本文开始进入vue3的学习。本文介绍vue3中的响应式原理&#xff0c;相较于vue2中通过object.defineProperty&#xff08;vue2中的响应式&#xff09;来实现响应式&#xff0c;vue3中换成了Proxy来进行实现。 安装Vue3项目…

Linux---gcc编译

目录 前言 一、gcc编译 二、程序的编译过程 三、gcc查看编译过程 1.预处理阶段 2.编译 3.汇编 4.链接 动静态库链接的内容 动静态库链接的优缺点 5.总结记忆 前言 在前面我们学会使用vim对文件进行编辑&#xff0c;如果是C或者C程序&#xff0c;我们编辑好了内容…

C++多态与虚函数的使用注意

文章目录 什么情况下用多态构造和析构的顺序为什么要把析构函数声明为虚函数为什么不能在构造函数和析构函数中使用虚函数什么情况下用多态 多态是面向对象编程中的一个重要概念,可以提高代码的可扩展性和可维护性。在以下情况下,可以考虑使用多态: 当有一个基类或接口,并…