Flink多流处理之Broadcast(广播变量)

写过Spark批处理的应该都知道,有一个广播变量broadcast这样的一个算子,可以优化我们计算的过程,有效的提高效率;同样在Flink中也有broadcast,简单来说和Spark中的类似,但是有所区别,首先Spark中的broadcast是静态的数据,而Flink中的broadcast是动态的,也就是源源不断的数据流.在Flink中会将广播的数据存到state中.
在这里插入图片描述
在Flink中主流数据可以获取state中的所有状态数据,使用过window的应该都清楚,当两个streamData中的数据到达窗口的时间刚好错过时就会发生关联不上的情况,如window2S,sreamData1到达窗口的时间刚好卡在这个2S窗口的尾端,而streamData到达窗口时,这个窗口已经结束了,这种情况就算这两条数据有相同id也无法进行关联了.
但是broadcast会将到达的数据都存储在state中,这样主流到达的每一条数据都可以和state中的广播流数据进行关联比较.
在这里插入图片描述
流程图内容可能不够准确,只是为了看起来方便理解.

  • 数据源
    # 主流数据
    ➜  ~ nc -lk 1234
    101,浏览商品,2023-08-02
    102,浏览商品,2023-08-02
    103,查看商品价格,2023-08-04
    101,商品加入购物车,2023-08-03
    101,从购物车删除商品,2023-08-03
    102,下单,2023-08-02
    102,申请延期发货,2023-08-03
    103,点击商品详情页,2023-08-04
    104,点击收藏,2023-08-05
    104,下单,2023-08-05
    104,付款,2023-08-06
    105,浏览商品,2023-08-07
    106,浏览商品,2023-08-07
    106,加入购物车,2023-08-08
    107,浏览商品,2023-08-10
    
    # 广播流数据
    ➜  ~ nc -lk 5678
    101,小明
    102,张丽
    103,公孙飞天
    104,王二虎
    106,李四
    108,赵屋面
    
  • 代码
    import org.apache.flink.api.common.state.BroadcastState;
    import org.apache.flink.api.common.state.MapStateDescriptor;
    import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.streaming.api.datastream.*;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
    import org.apache.flink.util.Collector;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/11* @Description: 多流操作-广播流**/
    public class FlinkBroadcast {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(3);// 数据集源1作为主流数据(用户行为日志[id,behavior,date])DataStreamSource<String> sourceStream1 = env.socketTextStream("localhost", 1234);// 将字符串切割处理SingleOutputStreamOperator<Tuple3<String, String, String>> mainSourceStream = sourceStream1.map(str -> Tuple3.of(str.split(",")[0], str.split(",")[1], str.split(",")[2])).returns(new TypeHint<Tuple3<String, String, String>>() {});// 数据源2作为广播流数据(用户信息(id,name))DataStreamSource<String> sourceStream2 = env.socketTextStream("localhost", 5678);// 将字符串切割处理SingleOutputStreamOperator<Tuple2<String, String>> mapStream2 = sourceStream2.map(str -> Tuple2.of(str.split(",")[0], str.split(",")[1])).returns(new TypeHint<Tuple2<String, String>>() {});// 将广播流数据源进行广播/***参数说明* 这里需要我们传入一个MapStateDescriptor,其实就是一个Map结构的数据<k,v>* <String, Tuple2<String, String>>,第一个String类型就是广播流和主流连接的字段,在这个代码中就是id,由实际业务决定* <String, Tuple2<String, String>>,第二个Tuple2<String, String>就是实际广播数据流的数据,由实际业务决定* "userInfo"就是给一个名字,这个自定义无强制要求**/// 先构建一个状态,后面也会使用MapStateDescriptor<String, Tuple2<String, String>> userInfoState = new MapStateDescriptor<>("userInfo", TypeInformation.of(String.class), TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}));BroadcastStream<Tuple2<String, String>> userInfoBroadStream = mapStream2.broadcast(userInfoState);// 将主流数据和广播流数据使用connect连接/*** 我们将数据转变成广播流之后,在Flink中也不知哪个数据流需要使用这个广播流(userInfoBroadStream),* 这个时候就需要我们自己将主流数据和该广播流数据进行连接**/BroadcastConnectedStream<Tuple3<String, String, String>, Tuple2<String, String>> connectedStream = mainSourceStream.connect(userInfoBroadStream);/*** 在process()中有两类函数供我们选择,KeyedBroadcastProcessFunction和BroadcastProcessFunction,* 这里要注意当"connectedStream"是KeyedStream时选择KeyedBroadcastProcessFunction* 当"connectedStream"不是KeyedStream时选择BroadcastProcessFunction就可以.* 使用keyBy算子返回的就是KeyedStream**/SingleOutputStreamOperator<String> resultStream = connectedStream.process(new BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>() {// 这个方法写主流数据处理逻辑@Overridepublic void processElement(Tuple3<String, String, String> value, BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>.ReadOnlyContext ctx, Collector<String> out) throws Exception {/*** 要注意,这里我们最好从ReadOnlyContext来获取广播状态数据,因为获取只读的状态数据可以保证数据的安全性,* 如果是通过成员变量的方式获取可修改的状态数据,就会存在数据不安全的问题,如在代码逻辑中出现了对状态数据* 修改的代码,那么共享此状态的并行算子可能看到的状态数据不一致,就会导致数据错误或者代码报错.* 而使用ReadOnlyContext就可以保证processElement这个方法中我们只对状态数据进行读取.**/ReadOnlyBroadcastState<String, Tuple2<String, String>> broadcastState = ctx.getBroadcastState(userInfoState);if (broadcastState != null) {// 通过主流中的ID作为key获取广播变量中的用户信息Tuple2<String, String> userInfo = broadcastState.get(value.f0);// 输出数据的形式(id,behavior,date,name)if (userInfo == null) {out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + "NULL");} else {out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + userInfo.f1);}} else {out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + "NULL");}}// 这个方法写广播流数据处理逻辑@Overridepublic void processBroadcastElement(Tuple2<String, String> value, BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>.Context ctx, Collector<String> out) throws Exception {// 使用Context获取状态BroadcastState<String, Tuple2<String, String>> broadcastState = ctx.getBroadcastState(userInfoState);// 将数据存入到状态中broadcastState.put(value.f0, value);}});// 打印结果resultStream.print();env.execute("Flink broadcast");}
    }
    
  • 结果
    3> 101,浏览商品,2023-08-02,小明
    3> 101,商品加入购物车,2023-08-03,小明
    3> 102,申请延期发货,2023-08-03,张丽
    3> 104,下单,2023-08-05,王二虎
    3> 106,浏览商品,2023-08-07,李四
    1> 102,浏览商品,2023-08-02,张丽
    1> 101,从购物车删除商品,2023-08-03,小明
    1> 103,点击商品详情页,2023-08-04,公孙飞天
    1> 104,付款,2023-08-06,王二虎
    1> 106,加入购物车,2023-08-08,李四
    2> 103,查看商品价格,2023-08-04,公孙飞天
    2> 102,下单,2023-08-02,张丽
    2> 104,点击收藏,2023-08-05,王二虎
    2> 105,浏览商品,2023-08-07,NULL
    2> 107,浏览商品,2023-08-10,NULL
    
    代码内容就不进行详细解释了,注释基本都写清楚了,如有疑问可评论提问,共同探讨.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/36623.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

什么是微服务?

2.微服务的优缺点 优点 单一职责原则每个服务足够内聚&#xff0c;足够小&#xff0c;代码容易理解&#xff0c;这样能聚焦一个指定的业务功能或业务需求&#xff1b;开发简单&#xff0c;开发效率提高&#xff0c;一个服务可能就是专一的只干一件事&#xff1b;微服务能够被小…

命令提示符之操作基础(Windows)

打开命令提示符 方法一 打开指定文件的文件夹&#xff0c;在路径栏里输入“cmd”&#xff0c;回车&#xff0c;就进入控制台了。默认路径就是指定文件夹的路径。 方法二 打开指定的文件夹&#xff0c;按住shift键&#xff0c;在空白处右击&#xff0c;在菜单栏中选择“在此处打…

社区团购商城拼团秒杀接龙分销团长小程序开源版开发

社区团购商城拼团秒杀接龙分销团长小程序开源版开发 功能介绍&#xff1a; 商品管理&#xff1a;增加商品-商品列表-商品分类-商品单/多规格-商品标签 订单管理&#xff1a;订单列表-订单挑选-订单导出-订单打印-批量发货-商品评价 会员管理&#xff1a;会员列表-会员挑选-会员…

【Git】—— 标签管理

目录 &#xff08;一&#xff09;理解标签 1、作用 &#xff08;二&#xff09;创建标签 &#xff08;三&#xff09;操作标签 1、删除标签 2、推送标签 3、删除远程标签 &#xff08;一&#xff09;理解标签 标签 tag &#xff0c;可以简单的理解为是对某次 commit 的…

python中的迭代器和生成器

一、迭代器 支持迭代的容器&#xff0c;如列表&#xff08;list&#xff09;、元组&#xff08;tuple&#xff09;、字典&#xff08;dict&#xff09;、集合&#xff08;set&#xff09;这些序列式容器。 自定义迭代器的类中必须实现以下2个方法&#xff1a; __next__(self)…

监控Kubernetes 控制面组件的关键指标

控制面组件的监控&#xff0c;包括 APIServer、Controller-manager&#xff08;简称 CM&#xff09;、Scheduler、etcd 四个组件。 1、APIServer APIServer 的核心职能是 Kubernetes 集群的 API 总入口&#xff0c;Kube-Proxy、Kubelet、Controller-Manager、Scheduler 等都需…

TCP 协议十大相关特性总结

目录 一、TCP特性 二、报文格式 TCP十大核心特性 1. 确认应答 2. 超时重传 3. 连接管理(三次握手,四次挥手) 三次握手 四次挥手 4. 滑动窗口 情况一:接收方的ACK丢失 情况二:发送方的数据包丢失 5. 流量控制 6. 拥塞控制 7. 延迟应答 8. 捎带应答 9. 字节流粘包问题 10. TCP的…

大语言模型:LLM的概念是个啥?

一、说明 大语言模型&#xff08;维基&#xff1a;LLM- large language model&#xff09;是以大尺寸为特征的语言模型。它们的规模是由人工智能加速器实现的&#xff0c;人工智能加速器能够处理大量文本数据&#xff0c;这些数据大部分是从互联网上抓取的。 [1]所构建的人工神…

02 - git 文件重命名

查看所有文章链接&#xff1a;&#xff08;更新中&#xff09;GIT常用场景- 目录 文章目录 1. 第一种方式2. 第二种方式 1. 第一种方式 mv kongfu_person.txt kongfu.txt git add .2. 第二种方式 git mv kongfu_person.txt kongfu.txt

微服务实战项目-学成在线-项目优化(redis缓存优化)

微服务实战项目-学成在线-项目优化(redis缓存优化) 1 优化需求 视频播放页面用户未登录也可以访问&#xff0c;当用户观看试学课程时需要请求服务端查询数据&#xff0c;接口如下&#xff1a; 1、根据课程id查询课程信息。 2、根据文件id查询视频信息。 这些接口在用户未认…

MySQL表的增删查改

目录 一&#xff0c;新增 二&#xff0c;查询 2.1 全列查询 2.2 指定列查询 2.3 查询字段为表达式 2.4 别名 - as 2.5 去重 - distinct 2.6 排序 - order by 2.7 条件查询 - where 2.8 分页查询 - limit 三&#xff0c;修改 - update 四&#xff0c;删除 - delete 一…

Spring-2-透彻理解Spring 注解方式创建Bean--IOC

今日目标 学习使用XML配置第三方Bean 掌握纯注解开发定义Bean对象 掌握纯注解开发IOC模式 1. 第三方资源配置管理 说明&#xff1a;以管理DataSource连接池对象为例讲解第三方资源配置管理 1.1 XML管理Druid连接池(第三方Bean)对象【重点】 数据库准备 -- 创建数据库 create …

纯前端 -- html转pdf插件总结

一、html2canvasjsPDF&#xff08;文字会被截断&#xff09;&#xff1a; 将HTML元素呈现给添加到PDF中的画布对象&#xff0c;不能仅使用jsPDF&#xff0c;需要html2canvas或rasterizeHTML html2canvasjsPDF的具体使用链接 二、html2pdf&#xff08;内容显示不全文字会被截断…

[RoarCTF 2019Online Proxy]sql巧妙盲注

文章目录 [RoarCTF 2019Online Proxy]sql巧妙盲注解题脚本脚本解析 [RoarCTF 2019Online Proxy]sql巧妙盲注 解题 在源代码界面发现&#xff1a;Current Ip 我们会联想到&#xff1a;X-Forwarded-For来修改ip&#xff1a; 结果我们发现&#xff0c;response会讲Last Ip回显出…

请教电路高手帮忙Review一下是否可行?

想要实现STM32 3.3V GPIO 控制5V电源通断&#xff0c;默认状态为&#xff1a;接通。 使用如下电路图有无问题&#xff1f;参数是否需要调整&#xff1f;

8.14 ARM

1.练习一 .text 文本段 .global _start 声明一个_start函数入口 _start: _start标签&#xff0c;相当于C语言中函数mov r0,#0x2mov r1,#0x3cmp r0,r1beq stopsubhi r0,r0,r1subcc r1,r1,r0stop: stop标签&#xff0c;相当于C语言中函数b stop 跳转到stop标签下的第一条…

C++的IO流

C语言的输入与输出 C语言中我们用到的最频繁的输入输出方式就是scanf ()与printf()。 scanf(): 从标准输入设备(键盘)读取数据&#xff0c;并将值存放在变量中。printf(): 将指定的文字/字符串输出到标准输出设备(屏幕)。注意宽度输出和精度输出控制。C语言借助了相应的缓冲区来…

Vue [Day7]

文章目录 自定义创建项目ESlint 代码规范vuex 概述创建仓库向仓库提供数据使用仓库中的数据通过store直接访问通过辅助函数 mapState&#xff08;简化&#xff09;mutations传参语法(同步实时输入&#xff0c;实时更新辅助函数 mapMutationsaction &#xff08;异步辅助函数map…

IntelliJ IDEA 2021/2022关闭双击shift全局搜索

我这里演示的是修改&#xff0c;删除是右键的时候选择Remove就好了 IDEA左上角 File-->Settings 找到Navigate -->Search Everywhere &#xff0c;右键添加快捷键。 OK --> Apply应用

初始多线程

目录 认识线程 线程是什么&#xff1a; 线程与进程的区别 Java中的线程和操作系统线程的关系 创建线程 继承Thread类 实现Runnable接口 其他变形 Thread类及其常见方法 Thread的常见构造方法 Thread类的几个常见属性 Thread类常用的方法 启动一个线程-start() 中断…