flink以增量+全量的方式更新广播状态

背景

flink在实现本地内存和db同步配置表信息时,想要做到类似于增量(保证实时性) + 全量(保证和DB数据一致)的效果,那么我们如何通过flink的广播状态+外部定时器定时全量同步的方式来实现呢?

实现增量+全量的效果

package wikiedits.schedule;import java.util.List;
import java.util.Map;import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;//处理函数
public class BroadcastStatePlusSchedulerFunction extends KeyedBroadcastProcessFunction<String, String, String, String> {// 键值分区状态private final MapStateDescriptor<String, List<String>> mapStateDesc =new MapStateDescriptor<>("items", BasicTypeInfo.STRING_TYPE_INFO, new ListTypeInfo<>(String.class));// 广播状态private final MapStateDescriptor<String, String> ruleStateDescriptor = new MapStateDescriptor<>("RulesBroadcastState", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);@Overridepublic void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {// 1.增量消息更新广播状态BroadcastState<String, String> broadcastState = ctx.getBroadcastState(ruleStateDescriptor);broadcastState.put(value, value);// 2.全量更新,判断广播状态和DB配置表在本地缓存的配置项是否一致,比如如果广播状态记录少了,使用本地缓存中的记录来更新下广播状态for (Map.Entry<String, String> entry : StaticLoadUtil.getConfigCache().asMap().entrySet()) {String broadcastValue = broadcastState.get(entry.getKey());if(!StringUtils.equals(entry.getValue(), broadcastValue)){//如果不相等,那么以DB缓存中的为准}}// 3.自此,广播状态和DB配置表的状态几乎一致,不过由于他们的比较只发生于收到广播元素,所以我们可以在凌晨的时候故意从db中找出几条记录发送kafka消息到这个广播状态来进行触发比较,当然这里也可以当收到某个元素时覆盖掉flink的广播状态}@Overridepublic void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {// 键值分区状态final MapState<String, List<String>> state = getRuntimeContext().getMapState(mapStateDesc);// 广播状态for (Map.Entry<String, String> entry : ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {}}}// 外部定时器实现
package wikiedits.schedule;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;/*** 静态类定时加载DB配置表到本地内存中*/
public class StaticLoadUtil {// 定时任务执行器private static transient ScheduledExecutorService scheduledExecutorService;public static final Cache<String, String> configCache =CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();// 通过定时执行器定时同步本地缓存和DB配置表static {scheduledExecutorService = Executors.newScheduledThreadPool(10);scheduledExecutorService.scheduleWithFixedDelay(() -> {// 2.1 定时任务更新本地内存配置项// List<ConfigEntity> configList = DBManager.SELECTSQL.getConfigs();// for(ConfigEntity entity : configList){configCache.put("key", "value");// }// 2.2 更新本地变量threshold的值// threshold = DBManager.SELECTSQL.getConfig("threshold");}, 0, 100, TimeUnit.SECONDS);}/*** 获取本地缓存*/public static Cache<String, String> getConfigCache() {return configCache;}}

总结:

1.在处理广播元素的时候,除了更新广播状态之外,还要对比下广播状态和DB配置表在flink的本地缓存的数据,如果不一致,需要打印告警日志或者采取更新等措施

2.由于全量广播状态和DB配置表在flink的本地缓存的数据对比是在接收到某个广播元素的时候才进行,所以我们可以多余多发送一些相同的广播元素来触发对比

3.通过这种方式,广播状态就可以实现增量(实时性) + 全量(准确性) 的结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/100268.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

matlab高斯消元法求解线性方程组

高斯消元法的基本原理是通过一系列行变换将线性方程组的增广矩阵转化为简化行阶梯形式&#xff0c;从而得到方程组的解。其核心思想是利用矩阵的行变换操作&#xff0c;逐步消除未知数的系数&#xff0c;使得方程组的求解变得更加简单。 首先&#xff0c;给定系数矩阵A和常数向…

Python实现RNN算法对MFCC特征的简单语音识别

Python实现RNN算法对MFCC特征的简单语音识别 1、实现步骤 借助深度学习库 TensorFlow/Keras 来构建模型 1.对标签进行编码,将文本标签转换为整数标签。 2.对 MFCC 特征数据进行填充或截断,使其长度一致,以便于输入到 RNN 模型中 3.如果是二维数据需要转成三维: Simpl…

Python 图形化界面基础篇:创建自定义主题

Python 图形化界面基础篇&#xff1a;创建自定义主题 引言 Tkinter 库简介步骤1&#xff1a;导入 Tkinter 模块步骤2&#xff1a;创建 Tkinter 窗口步骤3&#xff1a;创建自定义主题步骤4&#xff1a;创建使用自定义主题的部件 完整示例代码代码解释结论 引言 在图形用户界面&…

XPS常见问题与解答-科学指南针

在做X 射线光电子能谱(XPS)测试时&#xff0c;科学指南针检测平台工作人员在与很多同学沟通中了解到&#xff0c;好多同学仅仅是通过文献或者师兄师姐的推荐对XPS测试有了解&#xff0c;但是对于其原理还属于小白阶段&#xff0c;针对此&#xff0c;科学指南针检测平台团队组织…

数据库查找、增加等基本操作

1、查询 SELECT column_name(s) FROM table_name WHERE condition; //如 SELECT * FROM USE database_name.user2; 2、增加 优点是可以将数据添加到表中&#xff0c;而不会影响现有数据。缺点是如果您需要将大量数据添加到表中&#xff0c;这可能会导致性能下降。 INSERT …

如何使用jest

最近在研究单元测试&#xff0c;虽说前端如果不是大且的项目不必要加&#xff0c;但至少得会&#xff0c;因此花了些时间研究&#xff0c;以下是我总结jest的使用。 jest是什么&#xff1f; Jest是 Facebook 的一套开源的 JavaScript 测试框架&#xff0c; 它自动集成了断言、…

【重拾C语言】六、批量数据组织(一)数组(数组类型、声明与操作、多维数组;典例:杨辉三角、矩阵乘积、消去法)

目录 前言 六、批量数据组织——数组 6.1 成绩统计——数组类型 6.1.1 数组类型 6.1.2 数组声明与操作 6.1.3 成绩统计 6.2 统计多科成绩——多维数组 6.3 程序设计实例 6.3.1 杨辉三角形 6.3.2 矩阵乘积 6.3.3 消去法 6.4 线性表——分类与检索 前言 ChatGPT C语…

Hadoop作业篇(一)

一、选择题 1. 以下哪一项不属于Hadoop可以运行的模式__C____。 A. 单机&#xff08;本地&#xff09;模式 B. 伪分布式模式 C. 互联模式 D. 分布式模式 C. 互联模式 不属于Hadoop可以运行的模式。 Hadoop主要有四种运行模式&#xff1a; A. 单机&#xff08;本地&#xf…

基于SpringBoot的桂林旅游景点导游平台

目录 前言 一、技术栈 二、系统功能介绍 用户信息管理 景点类型管理 景点信息管理 线路推荐管理 用户注册 线路推荐 论坛交流 三、核心代码 1、登录模块 2、文件上传模块 3、代码封装 前言 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的实…

FreeRTOS学习笔记(一)

一、基础知识思维导图 vtaskdelay函数会开启中断&#xff0c;所以在临界区不能用vtaskdelay 二、任务的创建与删除 2.1、任务的动态创建与删除 ........#define START_TASK_PRIO 1 #define START_TASK_STACK_SIZE 128 TaskHandle_t start_task_handler; void …

UNIQUE VISION Programming Contest 2023 Autumn(AtCoder Beginner Contest 323)

A - Weak Beats 链接 : A - Weak Beats 思路 : 模拟即可,如果在偶数位上出现了非0得元素&#xff0c;直接输出"No"后返回即可&#xff0c;循环顺利结束的话&#xff0c;就直接输出"Yes"; 代码 : #include<bits/stdc.h> #define IOS ios::sy…

WebSocket介绍及部署

WebSocket是一种在单个TCP连接上进行全双工通信的协议&#xff0c;其设计的目的是在Web浏览器和Web服务器之间进行实时通信&#xff08;实时Web&#xff09;。 WebSocket协议的优点包括&#xff1a; 1. 更高效的网络利用率&#xff1a;与HTTP相比&#xff0c;WebSocket的握手…

Flutter AI五子棋

前言 在上一篇文章中&#xff0c;讲解了如何实现双人在本地对战的五子棋&#xff0c;但是只有一个人的时候就不太好玩&#xff0c;同时博主也没有把五子棋相关的文章写过瘾。那么这篇文章&#xff0c;我们来实现一个功能更加丰富的五子棋吧&#xff01;在设计五子棋的算法方面&…

docker搭建Jenkins及基本使用

1. 搭建 查询镜像 docker search jenkins下载镜像 docker pull jenkins/jenkins启动容器 #创建文件夹 mkdir -p /home/jenkins_home #权限 chmod 777 /home/jenkins_home #启动Jenkins docker run -d -uroot -p 9095:8080 -p 50000:50000 --name jenkins -v /home/jenkins_home…

【Qt】三种方式实现抽奖小游戏

简介 本文章是基本Qt与C实现一个抽奖小游戏&#xff0c;用到的知识点在此前发布的几篇文章。 下面是跳转链接&#xff1a; 【Qt控件之QLabel】用法及技巧链接&#xff1a; https://blog.csdn.net/MrHHHHHH/article/details/133691441?spm1001.2014.3001.5501 【Qt控件之QPus…

Matlab论文插图绘制模板第118期—进阶气泡图

之前的文章中&#xff0c;分享过Matlab气泡图的绘制模板&#xff1a; 图虽说好看&#xff0c;但有一个缺点&#xff1a;需要手动调节两个图例的位置。 为了解决这一问题&#xff0c;我们不妨结合前段时间分享的紧凑排列多子图的绘制模板&#xff1a; 从而达到自动对齐排列的效…

【版本控制工具一】Git 安装注册及使用

文章目录 一、Git 、Github、Gitee1.1 概述1.2 码云 相对于 github 的优势 二、Github 或 Gitee注册2.1 注册2.2 创建仓库 三、Git下载与安装四、创建本地仓库 一、Git 、Github、Gitee 1.1 概述 Git 是一个开源的分布式版本控制系统&#xff0c;用于敏捷高效地处理任何或小或…

Multi-Grade Deep Learning for Partial Differential Equations

论文阅读&#xff1a;Multi-Grade Deep Learning for Partial Differential Equations with Applications to the Burgers Equation Multi-Grade Deep Learning for Partial Differential Equations with Applications to the Burgers Equation符号定义偏微分方程定义FNN定义PI…

网络流量安全分析-工作组异常

在网络中&#xff0c;工作组异常分析具有重要意义。以下是网络中工作组异常分析的几个关键点&#xff1a; 检测网络攻击&#xff1a;网络中的工作组异常可能是由恶意活动引起的&#xff0c;如网络攻击、病毒感染、黑客入侵等。通过对工作组异常的监控和分析&#xff0c;可以快…

(Qt5Gui.dll)处(位于 xxx.exe 中)引发的异常: 0xC0000005: 读取位置 XXXXXXXX 时发生访问冲突

最新在处理opencv的时候遇到(Qt5Gui.dll)处(位于 xxx.exe 中)引发的异常: 0xC0000005: 读取位置 XXXXXXXX 时发生访问冲突,导致上位机崩溃严重影响开发的效率。 简要代码: void show() {QImage img = QImage(data,width,height,bytePerLine,QImage::Format_RGB888); emit im…