flink1.18 广播流 The Broadcast State Pattern 官方案例scala版本

对应官网

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/

测试数据

 * 广播流 官方案例 scala版本* 广播状态*    https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/* 事件流:*    red,4side*    red,5side*    red,1side*    red,4side** 规则流:*    rule1,4side,1side* 广播规则流,使用mapstate存储每种规则和对应的事件流数据 eg: {rule1 -> [4side,4side]} 遇到1side到来,则全部输出.* map可存储多个规则

完整scala版本代码

package com.yy.state.operatorStateDemoimport org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeHint, TypeInformation}
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.typeutils.ListTypeInfo
import org.apache.flink.streaming.api.datastream.KeyedStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
import org.apache.flink.util.Collectorimport java.time.ZoneId
import scala.collection.JavaConverters.iterableAsScalaIterableConverter
import scala.collection.mutable.ListBuffer
import scala.collection.JavaConverters._/*** 广播流 官方案例 scala版本* 广播状态*    https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/* 事件流:*    red,4side*    red,5side*    red,4side*    red,1side*    red,4side** 规则流:*    rule1,4side,1side* 广播规则流,使用mapstate存储每种规则和对应的事件流数据 eg: {rule1 -> [4side,4side]} 遇到1side到来,则全部输出.* map可存储多个规则*/
object BroadcastStateV1 {case class Item(color:Color,shape: Shape){def getShape()={shape}}case class Rule(name:String,first:Shape,second:Shape)case class Color(color:String)case class Shape(shape:String)def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = StreamTableEnvironment.create(env)// 指定国内时区tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))val itemStream = env.socketTextStream("localhost", 9999).map(_.split(",")).map(arr => Item(Color(arr(0)), Shape(arr(1))))val ruleStream = env.socketTextStream("localhost", 9998).broadcast().map(s => Rule(s.split(",")(0), Shape(s.split(",")(1)), Shape(s.split(",")(2))))val ruleStateDescriptor = new MapStateDescriptor("RulesBroadcastState",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint[Rule](){}));val ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor)val colorPartitionedStream: KeyedStream[Item, Color] = itemStream.keyBy(new KeySelector[Item, Color] {override def getKey(value: Item): Color = value.color})colorPartitionedStream.connect(ruleBroadcastStream).process(// type arguments in our KeyedBroadcastProcessFunction represent://   1. the key of the keyed stream//   2. the type of elements in the non-broadcast side//   3. the type of elements in the broadcast side//   4. the type of the result, here a stringnew KeyedBroadcastProcessFunction[Color, Item, Rule, String]() {val  mapStateDesc =new MapStateDescriptor("items",BasicTypeInfo.STRING_TYPE_INFO,new ListTypeInfo(classOf[Item]))val ruleStateDescriptor =new MapStateDescriptor("RulesBroadcastState",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint[Rule]() {}))override def processElement(value: Item, ctx: KeyedBroadcastProcessFunction[Color, Item, Rule, String]#ReadOnlyContext, out: Collector[String]): Unit = {val state = getRuntimeContext().getMapState(mapStateDesc)val shape = value.getShape()// 遍历广播的 rulectx.getBroadcastState(ruleStateDescriptor).immutableEntries().asScala.foreach{entry =>val ruleName = entry.getKey()val rule = entry.getValue()val stored: ListBuffer[Item] = {if (state.contains(ruleName)) {state.get(ruleName).asScala.to[ListBuffer]} else {new ListBuffer[Item]()}}//if (shape == rule.second && stored.nonEmpty) {stored.foreach { i =>out.collect("MATCH: " + i + " - " + value);}stored.clear();}// there is no else{} to cover if rule.first == rule.secondif (shape.equals(rule.first)) {stored.append(value);}if (stored.isEmpty) {// 规则已经匹配输出 清理状态state.remove(ruleName)} else {// 没输出则更新状态state.put(ruleName, stored.asJava)}}}override def processBroadcastElement(value: Rule, ctx: KeyedBroadcastProcessFunction[Color, Item, Rule, String]#Context, out: Collector[String]): Unit = {ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);}}).print("sink --> ")env.execute("flink-broadcast-state")}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/641021.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

css初始化(不应该固定)

1)对边框 margin:0padding:0 2)图片去边框 这个得看浏览器了,有的浏览器是带边框的 img {border:none;} 3)清除 ul ol 前的小点 ul,ol {list-style:none; } 4)input input {padding-top:0;padding-bottom:0;boder:none; } 5) 去掉a标签的下滑线…

风二西CTF流量题大集合-刷题笔记|NSSCTF流量题(1)

2.[鹤城杯 2021]流量分析 flag{w1reshARK_ez_1sntit} 3.[CISCN 2023 初赛]被加密的生产流量 c1f_fi1g_1000 4.[GKCTF 2021]签到 flag{Welc0me_GkC4F_m1siCCCCCC!} 5.[闽盾杯 2021]Modbus的秘密 flag{HeiDun_2021_JingSai} 6.[LitCTF 2023]easy_shark 7.[CISCN 2022 初赛]ez…

opencv#29 图像噪声的产生

在上一节的图像卷积我们了解到图像卷积可以用于去除图像中的噪声,那么对于现实生活中每一张采集到的图像都会包含噪声,也就是我们通过相机无法得到不包含噪声的图像,如果我想衡量噪声去除能力的强弱,就必须在一张不含噪声的图像中…

瑞_力扣LeetCode_104. 二叉树的最大深度

文章目录 题目 104. 二叉树的最大深度题解后序遍历 递归实现后序遍历 迭代实现层序遍历 🙊 前言:本文章为瑞_系列专栏之《刷题》的力扣LeetCode系列,主要以力扣LeetCode网的题进行解析与分享。本文仅供大家交流、学习及研究使用,禁…

【NVIDIA】Jetson Orin Nano系列:Qt+Gstreamer(01)pro中配置gstreamer库和头文件路径

1、安装头文件和库 本人烧写的Ubuntu22.0版本,文件系统已有相关头文件,如果没有可以使用下面的命令安装 sudo apt install libgstreamer1.0-dev libgstreamer-plugins-base1.0-dev libgstreamer-plugins-good1.0-dev2、pro配置头文件路径和库 QT += core gui greate…

多个coco数据标注文件合并

一、coco数据集是什么? COCO(Common Objects in Context)是一个用于目标检测和图像分割任务的标注格式。如果你有多个COCO格式的JSON文件,你可能需要将它们合并成一个文件,以便更方便地处理和管理数据。在这篇博客中&…

每日算法打卡:归并排序 day 22

文章目录 原题链接题目描述输入格式输出格式数据范围输入样例:输出样例: 题目分析示例代码 原题链接 787. 归并排序 题目难度:简单 题目描述 给定你一个长度为 n 的整数数列。 请你使用归并排序对这个数列按照从小到大进行排序。 并将排…

Armv8-M的TrustZone技术之内存属性单元

如果处理器包含Armv8-M安全扩展,则内存区域的安全状态由内部安全属性单元(SAU,Secure Attribution Unit)或外部实现定义的属性单元(IDAU,Implementation Defined Attribution Unit)的组合控制。…

机器学习实验报告——APRIORI算法

目录 一、算法介绍 1.1算法背景 1.2算法引入 1.3算法假设 1.4算法基本概念介绍 1.4.1关联规则 1.4.2支持度 1.4.3置信度 1.4.4频繁项集 1.4.5项目 1.4.6提升度 二、算法原理 2.1算法思想 2.2Apriori算法产生频繁项集 2.3Apriori算法的基本步骤 2.4关联分析 三、算法实现 3.1 Ap…

react hooks遇到setTimeout

在JavaScript中,setTimeout函数是异步执行的,它会在事件循环的下一个循环中执行。因此,即使通过箭头函数将当前状态值传递给setTimeout的回调函数,该回调函数仍然会在当前状态值更新之前执行。 为了避免状态获取到旧值的问题&…

深度学习(5)---自注意力机制

文章目录 1. 输入与输出2. Self-attention2.1 介绍2.2 运作过程2.3 矩阵相乘理解运作过程 3. 位置编码4. Truncated Self-attention4.1 概述4.2 和CNN对比4.3 和RNN对比 1. 输入与输出 1. 一般情况下在简单模型中我们输入一个向量,输出结果可能是一个数值或者一个类…

【一站解决您的问题】mac 利用命令升级nodejs、npm、安装Nodejs的多版本管理器n、nodejs下载地址

一:下载nodejs 官网地址,点击下载稳定版 https://nodejs.org/en 如果官网下载特别慢,可以点击这个地址下载 点击这里 https://nodejs.cn/download/current/ 安装完成后,就包含了nodejs 和 npm。此时您的版本就是下载安装的版本…

域名与IP地址之间的关系?

域名和IP相辅相成,两者结合使用户可以简单、轻松地访问Internet。不过对于初涉互联网的新手站长很可能误以为域名就是 IP地址,甚至将独立IP理解成只能够自己访问。 么,域名和IP地址究竟是怎么定义的,它们之间又有哪些关系呢&#…

flume自定义拦截器

要自定义 Flume 拦截器,你需要编写一个实现 org.apache.flume.interceptor.Interceptor 接口的自定义拦截器类。以下是一个简单的示例: import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interce…

数字频率合成器dds的量化性能分析matlab仿真

目录 1.课题概述 2.系统仿真结果 3.核心程序与模型 4.系统原理简介 4.1 DDS的基本原理 4.2 DDS的量化性能分析 5.完整工程文件 1.课题概述 数字频率合成器dds的量化性能分析matlab仿真,分别定义累加器位宽,截位位宽,模拟DAC位宽等&…

第11章-第1节-SQL语句(基于mysql社区版8.0.36.0)

1、先看看以前写过的几篇数据库基础文章: 基础SQL语句整理(mysql5.7下通过运行) 进阶SQL语句整理(mysql5.7下通过运行) 高级SQL语句整理(mysql5.7下通过运行) 2、SQL的基础应用感觉没有太多可以讲的东西,直接上学习笔记,可以看的很直接&a…

制冷系统吸排气管路的设计

吸气管路的设计 由于流动产生的阻力损失,导致压缩机吸气口处的压力低于蒸发器出口处的压力。当吸气压力降低时,回气比容增大,压缩机的排气量减少,机组制冷量将会有损失。同时吸气管中还要维持足够高的制冷剂流速以使冷冻油能顺利返回压缩机。…

走出大模型部署新手村!小明这样用魔搭×函数计算

作者:拓山 前文介绍了魔搭 ModelScope 社区模型服务 SwingDeploy 服务。开发者可以将模型从魔搭社区的模型库一键部署至阿里云函数计算,当选择模型并部署时,系统会选择对应的机器配置。按需使用可以在根据工作负载动态的减少资源&#xff0c…

如何在WordPress网站中添加多语言搜索(2种简单方法)

您想在WordPress网站中添加多语言搜索吗? 如果您有一个多语言 WordPress 网站,那么添加多语言搜索功能可以帮助用户通过使用自己的语言进行搜索来更快地找到信息。 在本文中,我们将向您展示如何在 WordPress 中轻松添加多语言搜索&#xff…

C#,入门教程(31)——预处理指令的基础知识与使用方法

上一篇: C#,入门教程(30)——扎好程序的笼子,错误处理 try catchhttps://blog.csdn.net/beijinghorn/article/details/124182386 Visual Studio、C#编译器以及C#语法所支持的预处理指令,绝对是天才设计。 编译程序的时候会发现&am…