Flink广播流 BroadcastStream

文章目录

  • 前言
  • BroadcastStream代码示例
  • Broadcast 使用注意事项


前言

Flink中的广播流(BroadcastStream)是一种特殊的流处理方式,它允许将一个流(通常是一个较小的流)广播到所有的并行任务中,从而实现在不同任务间共享数据的目的。广播流在处理配置信息、小数据集或者全局变量等场景下特别有用,因为这些数据需要在所有任务中保持一致且实时更新。

广播流的使用通常涉及以下步骤:

  1. 定义MapStateDescriptor:首先需要定义一个MapStateDescriptor来描述要广播的数据的格式。这个描述器指定了数据的键值对类型。

  2. 创建广播流:然后,需要将一个普通的流转换为广播流。这通常通过调用流的broadcast()方法实现,并将MapStateDescriptor作为参数传入。

  3. 连接广播流与非广播流:一旦有了广播流,就可以将其与一个或多个非广播流(无论是Keyed流还是Non-Keyed流)连接起来。这通过调用非广播流的connect()方法完成,并将广播流作为参数传入。连接后的流是一个BroadcastConnectedStream,它提供了process()方法用于处理数据。

  4. 处理数据:在process()方法中,可以编写逻辑来处理非广播流和广播流的数据。根据非广播流的类型(Keyed或Non-Keyed),需要传入相应的KeyedBroadcastProcessFunctionBroadcastProcessFunction类型的处理函数。

广播流的一个典型使用场景是在处理数据时需要实时动态改变配置。例如,当需要从MySQL数据库中实时查询和更新某些关键字过滤规则时,如果直接在计算函数中进行查询,可能会阻塞整个计算过程甚至导致任务停止。通过使用广播流,可以将这些配置信息广播到所有相关任务的实例中,然后在计算过程中直接使用这些配置信息,从而提高计算效率和实时性。

总的来说,Flink的广播流提供了一种有效的方式来实现不同任务间的数据共享和实时更新,适用于各种需要全局数据或配置的场景。


BroadcastStream代码示例

功能:将用户信息进行广播,从Kafka中读取用户访问记录,判断访问用户是否存在


import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.util.Collector;import flink.demo.data.UserVo;
/*** 多流connect,并进行join**/
public class BroadcastTest{public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties proterties = new Properties();proterties.setProperty("bootstrap.servers", "10.168.88.88:9092");proterties.setProperty("group.id", "test");proterties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");proterties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//        proterties.setProperty("auto.offset.reset", "latest");FlinkKafkaConsumer<ObjectNode> consumerVisit= new FlinkKafkaConsumer<>("test",new JSONKeyValueDeserializationSchema(false), proterties);DataStreamSource<ObjectNode> streamSource = env.addSource(consumerVisit);DataStreamSource<Tuple2<String, List<UserVo>>> userStreamSource = env.addSource(new UserListSource());MapStateDescriptor<String, List<UserVo>> descriptor =new MapStateDescriptor<>("userStream",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint<List<UserVo>>() {}));BroadcastStream<Tuple2<String, List<UserVo>>> broadcastStream = userStreamSource.broadcast(descriptor);// 将数据流和控制流进行连接,利用控制流中的数据来控制字符串的输出BroadcastConnectedStream<ObjectNode, Tuple2<String, List<UserVo>>> tmp=streamSource.connect(broadcastStream);tmp.process(new UserPvProcessor()).print();env.execute("kafkaTest");}private static class UserPvProcessorextends BroadcastProcessFunction<ObjectNode, Tuple2<String, List<UserVo>>, String> {private static final long serialVersionUID = 1L;MapStateDescriptor<String, List<UserVo>> descriptor =new MapStateDescriptor<>("userStream",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint<List<UserVo>>() {}));@Override//用户信息处理public void processBroadcastElement(Tuple2<String, List<UserVo>> value, Context ctx, Collector<String> out)throws Exception {// 将接收到的控制数据放到 broadcast state 中  ctx.getBroadcastState(descriptor).put(value.f0, value.f1);// 打印控制信息System.out.println(Thread.currentThread().getName() + " 接收到用户信息 : "+value.f0+"   " + value.f1);}@Override//数据流public void processElement(ObjectNode element, ReadOnlyContext ctx, Collector<String> out) throws Exception {// 从 broadcast state 中拿到用户列表信息List<UserVo> userList = ctx.getBroadcastState(descriptor).get("userList");String time=LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));if(userList!=null&&userList.size()>0) {Map<String,String> userMap=new HashMap<>();for(UserVo vo:userList) {userMap.put(vo.getUserid(), vo.getUserName());}
//				System.out.println(userMap);JsonNode value = element.get("value");String userid=value.get("user").asText();String userName=userMap.get(userid);if (StringUtils.isNotBlank(userName)) {out.collect(Thread.currentThread().getName()+"存在用户"+userid+"  "+userName +" "+time);}else {out.collect(Thread.currentThread().getName()+"不存在用户"+userid+" "+time );}}else {out.collect(Thread.currentThread().getName()+"不存在用户"+element.get("value")+" "+time );}}}
}

Broadcast 使用注意事项

  • 同一个 operator 的各个 task 之间没有通信,广播流侧(processBroadcastElement)可以能修改 broadcast state,而数据流侧(processElement)只能读 broadcast state.;
  • 需要保证所有 Operator task 对 broadcast state 的修改逻辑是相同的,否则会导致非预期的结果;
  • Operator tasks 之间收到的广播流元素的顺序可能不同:虽然所有元素最终都会下发给下游tasks,但是元素到达的顺序可能不同,所以更新state时不能依赖元素到达的顺序;
  • 每个 task 对各自的 Broadcast state 都会做快照,防止热点问题;
  • 目前不支持 RocksDB 保存 Broadcast state:Broadcast state 目前只保存在内存中,需要为其预留合适的内存

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/745745.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Yolo系列各代网络结构分析(一)

Yolo系列 Yolo系列算是目标检测领域的常青树了&#xff0c;从v1到最近的v9&#xff0c;一直都在不断迭代&#xff0c;不断改进&#xff0c;但是细看其各代网络的发展&#xff0c;其实还是有很多一脉相承之处以及算法设计偏好的&#xff0c;总结主要为以下几个方面&#xff1a;…

【计算机视觉】二、图像形成:1、向量和矩阵的基本运算:线性变换与齐次坐标

文章目录 一、向量和矩阵的基本运算1、简单变换1. 平移变换2. 缩放变换3. 旋转变换4. 一般线性变换 2、齐次坐标0. 齐次坐标表示1. 2D点的齐次坐标变换2. 投影空间 ( x , y , w ) (x, y, w) (x,y,w)3. 2D直线的齐次坐标表示a. 直线的参数方程表示b. 直线的法向量和原点距离表示…

深度解析Elasticsearch索引数据量过大的优化与部署策略

✨✨谢谢大家捧场&#xff0c;祝屏幕前的小伙伴们每天都有好运相伴左右&#xff0c;一定要天天开心哦&#xff01;✨✨ &#x1f388;&#x1f388;作者主页&#xff1a; 喔的嘛呀&#x1f388;&#x1f388; 目录 引言 1. 分片和副本策略 1.1分片策略 1.1.1 数据量 1.1.…

sqllab第二十一关通关笔记

知识点&#xff1a; 错误注入 最大长度为32超过需要利用截取函数分段读取cookie注入base64加密会保留符号的原始属性 通过admin admin进行登录发现和第二十关显示的内容一样&#xff0c;猜测应该还是cookie注入&#xff1b; 直接截取带有cookie的数据包&#xff0c;发现uname…

【计算机网络】概述

文章目录 一、Internet 因特网1.1 网络、互联网、因特网1.2 因特网的组成 二、三种交换方式2.1 电路交换 &#xff08;Circuit Switching&#xff09;2.2 *分组交换 &#xff08;Packet Switching&#xff09;2.3 报文交换 &#xff08;Message Switching&#xff09; 三、计算…

100W-150W电阻器-TO-247模压厚膜电阻(1)

EAK封装的TO-247功率电阻器为设计工程师提供稳定的晶体管式封装的大功率电阻器件&#xff0c;功率为100W-150W。这些电阻器专为需要精度和稳定性的应用而设计。该电阻器采用氧化铝陶瓷层设计&#xff0c;可将电阻元件和安装片分开。 EAK模压TO-247厚膜功率电阻器 这种结构提供了…

Redis基本使用

Redis基本使用 1.通用命令2.基本数据类型2.1 String2.2 Hash2.3 List2.4 Set2.5 SortedSet 3. SpringDataRedis3.1 简介3.2 快速代码示例3.3 序列化 1.通用命令 针对所有数据类型的操作可以在Redis官方文档查看。以下是通用的命令。 KEYS&#xff1a;查看符合模板的所有key D…

React——react 的基本使用

前提&#xff1a;安装全局的脚手架&#xff0c;通过create-creat-app 项目名&#xff0c;我们创建好一个新项目&#xff0c;cd进去&#xff0c;通过npm start去运行该项目 注意&#xff1a;简单看下demo的配置&#xff0c;在根目录我们可以看到&#xff0c;没有任何webpack的…

SpringCloudGateway之统一鉴权篇

SpringCloudGateway之统一鉴权篇 SpringCloudGateway实现统一鉴权的方式 基于JWT&#xff08;JSON Web Token&#xff09; 在客户端登录成功后&#xff0c;服务端生成一个包含用户信息和过期时间等数据的JWT令牌返回给客户端。 客户端在后续请求中将此令牌放在请求头&#xf…

rviz上不显示机器人模型(模型只有白色)

文档中的是base_footprint&#xff0c;需要根据自己所设的坐标系更改&#xff0c;我的改为base_link 如何查看自己设的坐标系&#xff1a; 这些parent父坐标系就是 同时打开rviz后需要更改成base_link

Linux——使用Keepalived实现DHCP服务的高可用

前言 Keepalived是一个用于实现高可用性的开源工具&#xff0c;主要用于实现基于VRRP协议的负载均衡和故障转移功能。它可以通过检测节点的健康状况&#xff0c;并自动切换到备份节点来确保服务的高可用性。 Keepalived支持多种检测方式&#xff0c;如ping、TCP连接等&#x…

20232831 2023-2024-2 《网络攻防实践》第2次作业

目录 20232831 2023-2024-2 《网络攻防实践》第2次作业1.实验内容2.实验过程3.学习中遇到的问题及解决4.学习感悟、思考等参考资料 20232831 2023-2024-2 《网络攻防实践》第2次作业 1.实验内容 &#xff08;1&#xff09;从www.csdn.net、www.163.com等中选择一个DNS域名进行…

结构设计模式 - 组合设计模式 - JAVA

组合设计模式 一. 介绍二.代码示例2.1 定义Component2.2 定义Leaf2.3 定义Composite 三. 参考案例 前言 这是我在这个网站整理的笔记,有错误的地方请指出&#xff0c;关注我&#xff0c;接下来还会持续更新。 作者&#xff1a;神的孩子都在歌唱 一. 介绍 由不同的对象组合成一个…

[嵌入式系统-39]:龙芯1B 开发学习套件 -9-PMON的文件结构

目录 前言&#xff1a; 一、PMON-V1.1 目录结构 二、Targets目录的组成 前言&#xff1a; 参考&#xff1a;​​​​​​龙芯相关 - 心映真的空间 一、PMON-V1.1 目录结构 PMON-V1.1 目录结构 pmon的目录结构大致如下&#xff08;由linux工具tree生成&#xff09; |-- Tar…

OSI(Open Systems Interconnection)模型和TCP/IP模型

OSI模型 OSI模型是一个概念模型&#xff0c;由国际标准化组织&#xff08;ISO&#xff09;在1984年提出&#xff0c;用于促进不同系统间的通信互联。OSI模型将网络通信的过程分为七层&#xff0c;每一层都有其特定的功能&#xff0c;从下至上依次是&#xff1a; 物理层&#x…

【机器学习智能硬件开发全解】(四)—— 政安晨:嵌入式系统基本素养【后摩尔时代】

随着物联网、大数据、人工智能时代的到来&#xff0c;海量的数据分析、大量复杂的运算对CPU的算力要求越来越高。 CPU内部的大部分资源用于缓存和逻辑控制&#xff0c;适合运行具有分支跳转、逻辑复杂、数据结构不规则、递归等特点的串行程序。 在集成电路工艺制程将要达到极…

CMake 脚本命令(Scripting Commands)之find_package

使用find_package引入外部依赖包 本章节通过示例演示Cmake中find_package的用法。 注&#xff1a;所有教程均在linux系统下测试通过&#xff0c;如果是windows和mac系统&#xff0c;可能会出现错误&#xff0c;需要自行调试修改 通过Cmake内置模块引入依赖包 为了方便我们在…

Todesk与向日葵:哪款远程工具更胜一筹?

在数字化时代&#xff0c;远程工具已成为许多个人和企业不可或缺的一部分。其中&#xff0c;Todesk和向日葵是两款备受瞩目的远程桌面软件。它们各自拥有独特的功能和优势&#xff0c;但究竟哪一款更适合您的需求呢&#xff1f;本文将从稳定性、易用性、价格和安全性等方面对这…

一个H5页面中直接使用React的示例与说明

示例 如题&#xff0c;下面的个简单代码示例—在H5页面中直接使用React <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0&q…

Pytorch从零开始实战21

Pytorch从零开始实战——Pix2Pix理论与实战 本系列来源于365天深度学习训练营 原作者K同学 文章目录 Pytorch从零开始实战——Pix2Pix理论与实战内容介绍数据集加载模型实现开始训练总结 内容介绍 Pix2Pix是一种用于用于图像翻译的通用框架&#xff0c;即图像到图像的转换。…