flink job同时使用BroadcastProcessFunction和KeyedBroadcastProcessFunction例子

背景:

广播状态可以用于规则表或者配置表的实时更新,本文就是用一个欺诈检测的flink作业作为例子看一下BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用

BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用

1.首先看主流程,主流程中使用了两个Broadcast广播的状态,这两个Broadcast广播的状态是独立的

 // 这里面包含规则广播状态的两次使用方法,分别在DynamicKeyFunction处理函数和DynamicAlertFunction处理函数,注意这两个处理函数中的广播状态是独立的,也就是需要分别维度,不能共享// Processing pipeline setupDataStream<Alert> alerts =transactions.connect(rulesStream).process(new DynamicKeyFunction()).uid("DynamicKeyFunction").name("Dynamic Partitioning Function").keyBy((keyed) -> keyed.getKey()).connect(rulesStream).process(new DynamicAlertFunction()).uid("DynamicAlertFunction").name("Dynamic Rule Evaluation Function");

2.BroadcastProcessFunction的处理,这里面会维护这个算子本身的广播状态,并把所有的事件扩散发送到下一个算子

public class DynamicKeyFunctionextends BroadcastProcessFunction<Transaction, Rule, Keyed<Transaction, String, Integer>> {@Overridepublic void open(Configuration parameters) {}// 这里会把每个事件结合上广播状态中的每个规则生成N条记录,流转到下一个算子@Overridepublic void processElement(Transaction event, ReadOnlyContext ctx, Collector<Keyed<Transaction, String, Integer>> out)throws Exception {ReadOnlyBroadcastState<Integer, Rule> rulesState =ctx.getBroadcastState(Descriptors.rulesDescriptor);forkEventForEachGroupingKey(event, rulesState, out);}// 独立维护广播状态,可以在广播状态中新增删除或者清空广播状态@Overridepublic void processBroadcastElement(Rule rule, Context ctx, Collector<Keyed<Transaction, String, Integer>> out) throws Exception {log.info("{}", rule);BroadcastState<Integer, Rule> broadcastState =ctx.getBroadcastState(Descriptors.rulesDescriptor);handleRuleBroadcast(rule, broadcastState);if (rule.getRuleState() == RuleState.CONTROL) {handleControlCommand(rule.getControlType(), broadcastState);}}}static void handleRuleBroadcast(Rule rule, BroadcastState<Integer, Rule> broadcastState)throws Exception {switch (rule.getRuleState()) {case ACTIVE:case PAUSE:broadcastState.put(rule.getRuleId(), rule);break;case DELETE:broadcastState.remove(rule.getRuleId());break;}}

3.KeyedBroadcastProcessFunction的处理,这里面也是会维护这个算子本身的广播状态,此外还有键值分区状态,特别注意的是在处理广播元素时,可以用applyToKeyedState方法对所有的键值分区状态应用某个方法,对于ontimer方法,依然可以访问键值分区状态和广播状态

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.ververica.field.dynamicrules.functions;import static com.ververica.field.dynamicrules.functions.ProcessingUtils.addToStateValuesSet;
import static com.ververica.field.dynamicrules.functions.ProcessingUtils.handleRuleBroadcast;import com.ververica.field.dynamicrules.Alert;
import com.ververica.field.dynamicrules.FieldsExtractor;
import com.ververica.field.dynamicrules.Keyed;
import com.ververica.field.dynamicrules.Rule;
import com.ververica.field.dynamicrules.Rule.ControlType;
import com.ververica.field.dynamicrules.Rule.RuleState;
import com.ververica.field.dynamicrules.RuleHelper;
import com.ververica.field.dynamicrules.RulesEvaluator.Descriptors;
import com.ververica.field.dynamicrules.Transaction;
import java.math.BigDecimal;
import java.util.*;
import java.util.Map.Entry;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.accumulators.SimpleAccumulator;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;/** Implements main rule evaluation and alerting logic. */
@Slf4j
public class DynamicAlertFunctionextends KeyedBroadcastProcessFunction<String, Keyed<Transaction, String, Integer>, Rule, Alert> {private static final String COUNT = "COUNT_FLINK";private static final String COUNT_WITH_RESET = "COUNT_WITH_RESET_FLINK";private static int WIDEST_RULE_KEY = Integer.MIN_VALUE;private static int CLEAR_STATE_COMMAND_KEY = Integer.MIN_VALUE + 1;private transient MapState<Long, Set<Transaction>> windowState;private Meter alertMeter;private MapStateDescriptor<Long, Set<Transaction>> windowStateDescriptor =new MapStateDescriptor<>("windowState",BasicTypeInfo.LONG_TYPE_INFO,TypeInformation.of(new TypeHint<Set<Transaction>>() {}));@Overridepublic void open(Configuration parameters) {windowState = getRuntimeContext().getMapState(windowStateDescriptor);alertMeter = new MeterView(60);getRuntimeContext().getMetricGroup().meter("alertsPerSecond", alertMeter);}// 键值分区状态和广播状态联合处理,在这个方法中可以更新键值分区状态,然后广播状态只能读取@Overridepublic void processElement(Keyed<Transaction, String, Integer> value, ReadOnlyContext ctx, Collector<Alert> out)throws Exception {long currentEventTime = value.getWrapped().getEventTime();addToStateValuesSet(windowState, currentEventTime, value.getWrapped());long ingestionTime = value.getWrapped().getIngestionTimestamp();ctx.output(Descriptors.latencySinkTag, System.currentTimeMillis() - ingestionTime);Rule rule = ctx.getBroadcastState(Descriptors.rulesDescriptor).get(value.getId());if (noRuleAvailable(rule)) {log.error("Rule with ID {} does not exist", value.getId());return;}if (rule.getRuleState() == Rule.RuleState.ACTIVE) {Long windowStartForEvent = rule.getWindowStartFor(currentEventTime);long cleanupTime = (currentEventTime / 1000) * 1000;ctx.timerService().registerEventTimeTimer(cleanupTime);SimpleAccumulator<BigDecimal> aggregator = RuleHelper.getAggregator(rule);for (Long stateEventTime : windowState.keys()) {if (isStateValueInWindow(stateEventTime, windowStartForEvent, currentEventTime)) {aggregateValuesInState(stateEventTime, aggregator, rule);}}BigDecimal aggregateResult = aggregator.getLocalValue();boolean ruleResult = rule.apply(aggregateResult);ctx.output(Descriptors.demoSinkTag,"Rule "+ rule.getRuleId()+ " | "+ value.getKey()+ " : "+ aggregateResult.toString()+ " -> "+ ruleResult);if (ruleResult) {if (COUNT_WITH_RESET.equals(rule.getAggregateFieldName())) {evictAllStateElements();}alertMeter.markEvent();out.collect(new Alert<>(rule.getRuleId(), rule, value.getKey(), value.getWrapped(), aggregateResult));}}}//维护广播状态,新增/删除或者整个清空,值得注意的是,处理广播元素时可以对所有的键值分区状态应用某个函数,比如这里当收到某个属于控制消息的广播消息时,使用applyToKeyedState方法把所有的键值分区状态都清空@Overridepublic void processBroadcastElement(Rule rule, Context ctx, Collector<Alert> out)throws Exception {log.info("{}", rule);BroadcastState<Integer, Rule> broadcastState =ctx.getBroadcastState(Descriptors.rulesDescriptor);handleRuleBroadcast(rule, broadcastState);updateWidestWindowRule(rule, broadcastState);if (rule.getRuleState() == RuleState.CONTROL) {handleControlCommand(rule, broadcastState, ctx);}}private void handleControlCommand(Rule command, BroadcastState<Integer, Rule> rulesState, Context ctx) throws Exception {ControlType controlType = command.getControlType();switch (controlType) {case EXPORT_RULES_CURRENT:for (Map.Entry<Integer, Rule> entry : rulesState.entries()) {ctx.output(Descriptors.currentRulesSinkTag, entry.getValue());}break;case CLEAR_STATE_ALL:ctx.applyToKeyedState(windowStateDescriptor, (key, state) -> state.clear());break;case CLEAR_STATE_ALL_STOP:rulesState.remove(CLEAR_STATE_COMMAND_KEY);break;case DELETE_RULES_ALL:Iterator<Entry<Integer, Rule>> entriesIterator = rulesState.iterator();while (entriesIterator.hasNext()) {Entry<Integer, Rule> ruleEntry = entriesIterator.next();rulesState.remove(ruleEntry.getKey());log.info("Removed Rule {}", ruleEntry.getValue());}break;}}private boolean isStateValueInWindow(Long stateEventTime, Long windowStartForEvent, long currentEventTime) {return stateEventTime >= windowStartForEvent && stateEventTime <= currentEventTime;}private void aggregateValuesInState(Long stateEventTime, SimpleAccumulator<BigDecimal> aggregator, Rule rule) throws Exception {Set<Transaction> inWindow = windowState.get(stateEventTime);if (COUNT.equals(rule.getAggregateFieldName())|| COUNT_WITH_RESET.equals(rule.getAggregateFieldName())) {for (Transaction event : inWindow) {aggregator.add(BigDecimal.ONE);}} else {for (Transaction event : inWindow) {BigDecimal aggregatedValue =FieldsExtractor.getBigDecimalByName(rule.getAggregateFieldName(), event);aggregator.add(aggregatedValue);}}}private boolean noRuleAvailable(Rule rule) {// This could happen if the BroadcastState in this CoProcessFunction was updated after it was// updated and used in `DynamicKeyFunction`if (rule == null) {return true;}return false;}private void updateWidestWindowRule(Rule rule, BroadcastState<Integer, Rule> broadcastState)throws Exception {Rule widestWindowRule = broadcastState.get(WIDEST_RULE_KEY);if (rule.getRuleState() != Rule.RuleState.ACTIVE) {return;}if (widestWindowRule == null) {broadcastState.put(WIDEST_RULE_KEY, rule);return;}if (widestWindowRule.getWindowMillis() < rule.getWindowMillis()) {broadcastState.put(WIDEST_RULE_KEY, rule);}}// ontimer方法中可以访问/更新键值分区状态,读取广播状态,此外ontimer方法和processElement方法以及processBroadcastElement方法是同步的,不需要考虑并发访问的问题@Overridepublic void onTimer(final long timestamp, final OnTimerContext ctx, final Collector<Alert> out)throws Exception {Rule widestWindowRule = ctx.getBroadcastState(Descriptors.rulesDescriptor).get(WIDEST_RULE_KEY);Optional<Long> cleanupEventTimeWindow =Optional.ofNullable(widestWindowRule).map(Rule::getWindowMillis);Optional<Long> cleanupEventTimeThreshold =cleanupEventTimeWindow.map(window -> timestamp - window);cleanupEventTimeThreshold.ifPresent(this::evictAgedElementsFromWindow);}private void evictAgedElementsFromWindow(Long threshold) {try {Iterator<Long> keys = windowState.keys().iterator();while (keys.hasNext()) {Long stateEventTime = keys.next();if (stateEventTime < threshold) {keys.remove();}}} catch (Exception ex) {throw new RuntimeException(ex);}}private void evictAllStateElements() {try {Iterator<Long> keys = windowState.keys().iterator();while (keys.hasNext()) {keys.next();keys.remove();}} catch (Exception ex) {throw new RuntimeException(ex);}}
}

ps: ontimer方法和processElement方法是同步访问的,没有并发的问题,所以不需要考虑同时更新键值分区状态的线程安全问题

参考文献:
https://flink.apache.org/2020/01/15/advanced-flink-application-patterns-vol.1-case-study-of-a-fraud-detection-system/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/131067.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【华为OD题库-005】选修课-Java

题目 现有两门选修课&#xff0c;每门选修课都有一部分学生选修&#xff0c;每个学生都有选修课的成绩&#xff0c;需要你找出同时选修了两门选修课的学生,先按照班级进行划分&#xff0c;班级编号小的先输出&#xff0c;每个班级按照两门选修课成绩和的降序排序&#xff0c;成…

Linux背景介绍与环境搭建

本章内容 认识 Linux, 了解 Linux 的相关背景学会如何使用云服务器掌握使用远程终端工具 xshell 登陆 Linux 服务器 Linux 背景介绍 发展史 本门课程学习Linux系统编程&#xff0c;你可能要问Linux从哪里来&#xff1f;它是怎么发展的&#xff1f;在这里简要介绍Linux的发展…

3+单细胞+代谢+WGCNA+机器学习

今天给同学们分享一篇生信文章“Identification of new co-diagnostic genes for sepsis and metabolic syndrome using single-cell data analysis and machine learning algorithms”&#xff0c;这篇文章发表Front Genet.期刊上&#xff0c;影响因子为3.7。 结果解读&#x…

正则表达式中扩展表示法的理解

正则表达式可以拥有扩展表达式&#xff0c;大致形式是(?...) 理解&#xff1a; 以(?)的含义为例子 data a1b2ce34.5d_6fres re.findall(r[a-z](?\d), data) # [a, b, ce]# ([a-z](?\d) 表示的是匹配小写字符一个或多个&#xff0c;但是匹配之后需要满足后续有数字一个…

Elasticsearch:处理 Elasticsearch 中的字段名称不一致

在 Elasticsearch 中&#xff0c;经常会遇到类似数据的不同索引具有不同字段名称的情况。 例如&#xff0c;一个索引可能使用字段名 level 来表示日志级别&#xff0c;而另一个索引可能使用 log_level 来达到相同目的。 出现这种不一致的原因有多种&#xff0c;例如不同的团队使…

Leetcode-1 两数之和

暴力穷举 class Solution {public int[] twoSum(int[] nums, int target) {int[] num new int[2];for(int i0;i<nums.length-1;i){for(int ji1;j<nums.length;j){if(nums[i]nums[j]target){num[0]i;num[1]j;}}}return num;} }HashMap&#xff0c;记录下标和对应值&…

Python功能制作之正则表达式批量删除并重命名文件

在平时&#xff0c;我们总是会遇到一种需要批量更改&#xff0c;或者是删除文件名字的情况。 对此&#xff0c;我们可以使用正则表达式进行匹配&#xff0c;然后去匹配删除相应的字数。 比如图片序列&#xff0c;因为一些特殊情况&#xff0c;导致名字为&#xff1a; 00000-…

SpringCloud 微服务全栈体系(十)

第十章 RabbitMQ 一、初识 MQ 1. 同步和异步通讯 微服务间通讯有同步和异步两种方式&#xff1a; 同步通讯&#xff1a;就像打电话&#xff0c;需要实时响应。 异步通讯&#xff1a;就像发邮件&#xff0c;不需要马上回复。 两种方式各有优劣&#xff0c;打电话可以立即得…

CLion2022安装

1. CLion下载 地址&#xff1a;https://www.jetbrains.com.cn/clion/download/other.html 下载你需要的版本&#xff0c;这里以2022.2.4为例 之后获取到对应的安装包 2. 安装 1、双击运行安装包&#xff0c;next 2、选择安装路径&#xff0c;建议非系统盘&#xff0c;nex…

git clone 报错:fatal: unable to access ‘https://github.com/XXXXXXXXX‘

国内使用GIT工具&#xff0c;拉取github代码&#xff0c;会因为网络原因无法成功拉取。出现如下类似情形&#xff1a; 此时更改 web URL即可&#xff0c;改用镜像的github网站替换https://github.com/。即URL里的https://github.com/换成https://hub.nuaa.cf/&#xff0c;即可…

linux之按键中断

查看原理图确认引脚 可以看到按键有两个&#xff0c;分别对应GPIO5_1和GPIO4_14 配置pinctrl&#xff0c;配置成GPIO模式 1.使用官方工具&#xff0c;配置下引脚 2.将生成的代码复制到设备树里 创建设备节点 生成二进制设备树文件 在工具链表下使用 make dtbs 或者使…

求职应聘校招社招,面对在线测评有什么技巧?

网上测评&#xff0c;不要怕&#xff0c;关键是在于你要提前准备充分。要说技巧&#xff0c;真心没有&#xff0c;但是建议我有一点点。 1、网上测评&#xff0c;技巧就是老实做 老老实实做题&#xff0c;我一贯的作风&#xff0c;老实人不吃亏。越是心思灵巧的人&#xff0c…

精通Nginx(05)-http工作机制、指令和内置变量

http服务是Nginx最原始的服务,搞清楚其工作机制非常有利于弄懂nginx是如何工作的。 Nginx核心模块为ngx_http_core_module。 目录 http工作机制 配置结构 工作机制 http常用指令 http server listen server_name location 优先级 "/"的特殊用法 root/a…

【STM32】基于HAL库建立自己的低功耗模式配置库(STM32L4系列低功耗所有配置汇总)

【STM32】基于HAL库建立自己的低功耗模式配置库&#xff08;STM32L4系列低功耗所有配置汇总&#xff09; 文章目录 低功耗模式&#xff08;此章节可直接跳过&#xff09;低功耗模式简介睡眠模式停止模式待机模式 建立自己的低功耗模式配置库通过结构体的方式来进行传参RTC配置…

使用自定义函数拟合辨识HPPC工况下的电池数据(适用于一阶RC、二阶RC等电池模型)

该程序可以离线辨识HPPC工况下的电池数据&#xff0c;只需要批量导入不同SOC所对应的脉冲电流电压数据&#xff0c;就可以瞬间获得SOC为[100% 90% 80% 70% 60% 50% 40% 30% 20% 10% 0%]的所有电池参数,迅速得到参数辨识的结果并具有更高的精度&#xff0c;可以很大程度上降低参…

降低毕业论文写作压力的终极指南

亲爱的同学们&#xff0c;时光荏苒&#xff0c;转眼间你们即将踏入毕业生的行列。毕业论文作为本科和研究生阶段的重要任务&#xff0c;不仅是对所学知识的综合运用&#xff0c;更是一次对自己学术能力和专业素养的全面考验。然而&#xff0c;论文写作常常伴随着压力和焦虑&…

适合新手使用的电脑监控软件有哪些?

电脑监控软件是一种用于监控和管理电脑行为的软件工具&#xff0c;可以帮助企业或个人了解和掌握员工或家庭成员的电脑使用情况&#xff0c;保障网络安全和隐私。现在市面上的电脑监控软件越来越多&#xff0c;究竟哪些操作起来比较简单&#xff0c;适合新手使用呢&#xff1f;…

力扣 203.移除链表元素第二种解法

目录 1.解题思路2.代码实现 1.解题思路 利用双指针&#xff0c;开辟一个新的头结点并依次向头结点尾插不为val的结点如果遇到值为val的结点就跳过并释放掉 2.代码实现 struct ListNode* removeElements(struct ListNode* head, int val) { if(headNULL)return NULL;struct …

win10系统nodejs的安装npm教程

1.在官网下载nodejs&#xff0c;https://nodejs.org/en 2&#xff0c;双击nodejs的安装包 3&#xff0c;点击 next 4&#xff0c;勾选I accpet the terms in…… 5&#xff0c;第4步点击next进入配置安装路径界面 6,点击next&#xff0c;选中Add to PATH &#xff0c;旁边…

[100天算法】-有序矩阵中第K小的元素(day 58)

题目描述 给定一个 n x n 矩阵&#xff0c;其中每行和每列元素均按升序排序&#xff0c;找到矩阵中第 k 小的元素。 请注意&#xff0c;它是排序后的第 k 小元素&#xff0c;而不是第 k 个不同的元素。示例&#xff1a;matrix [[ 1, 5, 9],[10, 11, 13],[12, 13, 15] ], k …