flink中使用外部定时器实现定时刷新

背景:

我们经常会使用到比如数据库中的配置表信息,而我们不希望每次都去查询db,那么我们就想定时把db配置表的数据定时加载到flink的本地内存中,那么如何实现呢?

外部定时器定时加载实现

1.在open函数中进行定时器的创建和定时加载,这个方法对于所有的RichFunction富函数都适用,包括RichMap,RichFilter,RichSink等,代码如下所示

package wikiedits.schedule;import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExecutorUtils;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class ScheduleRichMapFunction extends RichFlatMapFunction<String, String> {// 定时任务执行器private transient ScheduledExecutorService scheduledExecutorService;// 本地变量private int threshold;@Overridepublic void open(Configuration parameters) throws Exception {// 1.从db查询数据初始化本地变量
//        threshold = DBManager.SELECTSQL.getConfig("threshold");// 2.使用定时任务更新本地内存的配置信息以及更新本地变量threshold的值scheduledExecutorService = Executors.newScheduledThreadPool(10);scheduledExecutorService.scheduleWithFixedDelay(() -> {// 2.1 定时任务更新本地内存配置项// List<ConfigEntity> configList = DBManager.SELECTSQL.getConfigs();
//            for(ConfigEntity entity : configList){ConfigEntityLocalCache.getInstance().update("key", "value");
//            }// 2.2 更新本地变量threshold的值
//            threshold = DBManager.SELECTSQL.getConfig("threshold");}, 0, 100, TimeUnit.SECONDS);}@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {}@Overridepublic void close() throws Exception {ExecutorUtils.gracefulShutdown(100, TimeUnit.SECONDS, scheduledExecutorService);}}//本地缓存实现
package wikiedits.schedule;import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;/*** 保存Config信息的本地缓存 ---定时同步DB配置表的数据*/
public class ConfigEntityLocalCache {private static volatile ConfigEntityLocalCache instance = new ConfigEntityLocalCache();/*** 获取本地缓存实例*/public static ConfigEntityLocalCache getInstance() {return instance;}/** 缓存内存配置项 */private static Cache<String, String> configCache =CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();/*** 更新本地缓存数据*/public boolean update(String key, String value){configCache.put(key, value);return true;}/*** 更新本地缓存数据*/public  String getByKey(String key){return configCache.getIfPresent(key);}}

2.在静态类中通过static语句块创建定时器并定时加载,代码如下

package wikiedits.schedule;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;/*** 静态类定时加载DB配置表到本地内存中*/
public class StaticLoadUtil {// 定时任务执行器private static transient ScheduledExecutorService scheduledExecutorService;public static final Cache<String, String> configCache =CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();// 通过定时执行器定时同步本地缓存和DB配置表static {scheduledExecutorService = Executors.newScheduledThreadPool(10);scheduledExecutorService.scheduleWithFixedDelay(() -> {// 2.1 定时任务更新本地内存配置项// List<ConfigEntity> configList = DBManager.SELECTSQL.getConfigs();// for(ConfigEntity entity : configList){configCache.put("key", "value");// }// 2.2 更新本地变量threshold的值// threshold = DBManager.SELECTSQL.getConfig("threshold");}, 0, 100, TimeUnit.SECONDS);}/*** 获取本地缓存*/public static Cache<String, String> getConfigCache() {return configCache;}}

总结:

1.外部定时器可以通过在富函数的open中进行初始化并开始定时执行

2.外部定时器也可以通过创建一个单独的静态类,然后在static模块中进行初始化并开始定时执行

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/101773.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【RealTek sdk-3.4.14b】RTL8197FH-VG+RTL8367+RTL8812F WiFi to LAN 和WiFi to WAN吞吐量

LAN <----------> 2.4G WiFi Throughput 天线频宽模式协议连接速率TX(Mbps)RX(Mbps)TX&RX(Mbps)2X240MHz802.11nTCP300Mbps2051922112X240MHz802.11nUDP300Mbps224234231 LAN <----------> 5G WiFi Throughput 天线频宽模式协议连接速率TX(Mbps)RX(Mbps)TX&…

IntelliJ IDEA Maven 项目的依赖分析

在一个 maven 的项目中&#xff0c;我们需要知道我们的项目中使用的包可能有哪些冲突。 这个在 IntelliJ IDEA 中提供了贴心的查看。 选择 Maven 项目中的分析依赖。 随后&#xff0c;IntelliJ IDEA 将会打开一个依赖分析的标签页。 在这个标签页中&#xff0c;我们可以看到…

用例图 UML从入门到放弃系列之三

1.说明 关于用例图&#xff0c;这篇文章我将直接照搬罗伯特.C.马丁老爷子在《敏捷开发》一书种的第17章&#xff0c;并配上自己的理解&#xff0c;因为这一章写的实在是太精彩了&#xff0c;希望能够分享给大家&#xff0c;共勉。以下是老爷子的原文中文翻译以及豆芽的个人解读…

Ultra-Fast-Lane-Detection 车道线学习资料整理

目录 官方版本 两个优化 数据标注,降低参数量 1 数据标注 2降低参数量

代码随想录算法训练营Day50|动态规划9

代码随想录算法训练营Day50|动态规划9 文章目录 代码随想录算法训练营Day50|动态规划9一、198.打家劫舍二、213.打家劫舍II三、337.打家劫舍 III 一、198.打家劫舍 class Solution {public int rob(int[] nums) {if (nums null || nums.length 0) return 0;if (nums.length …

C++程序员入门怎么学?

1 背景 经常听到不少朋友在吐槽C太难学了&#xff0c;说什么从入门到放弃等等&#xff1b;做为一名C老兵&#xff0c;从2003年开始入坑C到今年2023年&#xff0c;整整20年的C开发从业者的我&#xff0c;今天尝试给有兴趣从事C开发的程序员提些建议&#xff0c;希望可以帮到大家…

C++ (Chapter 1)

C (一) 1.C的命名空间 先来看一个C语言的例子: 下面这段代码是可以正常运行的. #include<stdio.h> int rand 0; int main() {printf("%d \n", rand);return 0; }但是,包含了一个头文件之后,甚至无法通过编译. #include<stdio.h> #include<stdli…

力扣每日一题35:搜索插入的位置

题目描述&#xff1a; 给定一个排序数组和一个目标值&#xff0c;在数组中找到目标值&#xff0c;并返回其索引。如果目标值不存在于数组中&#xff0c;返回它将会被按顺序插入的位置。 请必须使用时间复杂度为 O(log n) 的算法。 示例 1: 输入: nums [1,3,5,6], target 5…

大数据Doris(八):启动FE步骤

文章目录 启动FE步骤 一、配置环境变量 二、​​​​​​​创建doris-mate

排序算法-插入排序法(InsertSort)

排序算法-插入排序法&#xff08;InsertSort&#xff09; 1、说明 插入排序法是将数组中的元素逐一与已排序好的数据进行比较&#xff0c;先将前两个元素排序好&#xff0c;再将第三个元素插入适当的位置&#xff0c;也就是说这三个元素仍然是已排序好的&#xff0c;接着将第…

剖析伦敦银最新价格走势图

国际金融市场瞬息万变&#xff0c;伦敦银的价格走势会受到诸多因素的影响&#xff0c;比如重要经济数据的公布&#xff0c;国际间的政治博弈&#xff0c;突发的政经大事&#xff0c;都可以令白银价格的走势&#xff0c;在短时间内暴涨暴跌的情况。 要在伦敦银市场实现良好的收益…

Linux文件目录总结

众所周知&#xff0c;Linux系统文件目录是树状结构&#xff0c;如下图所示&#xff1a; 英文缩写的目录下到底存放的是什么文件&#xff0c;善于做归纳总结的逍遥哥哥来解释一下&#xff1a; /bin&#xff1a;bin是Binary的缩写&#xff0c;这个目录存放着最经常使用的命令。 …

oracle 数据库删除序列

oracle 数据库删除序列 要删除 Oracle 数据库中的序列&#xff0c;你可以使用以下的 SQL 命令&#xff1a; DROP SEQUENCE sequence_name;其中&#xff0c;sequence_name 是你想删除的序列的名称。你需要确保当前用户对序列拥有适当的权限。 请注意&#xff0c;删除序列将永…

NEFU离散数学实验1-排列组合

相关概念 在离散数学中&#xff0c;组合数是一种用于计算从n个不同元素中选取m个元素的方式。以下是一些与组合数相关的概念&#xff1a; 排列&#xff1a;从n个不同元素中选取m个元素进行排列&#xff0c;排列数用P(n, m)表示&#xff0c;计算公式为P(n, m) n! / (n - m)! …

nodejs+vue家教管理系统

目 录 摘 要 I ABSTRACT II 目 录 II 第1章 绪论 1 1.1背景及意义 1 1.2 国内外研究概况 1 1.3 研究的内容 1 第2章 相关技术 3 2.1nodejs简介 4 2.2 express框架介绍 6 2.3 B/S结构 4 2.4 MySQL数据库 4 第3章 系统分析 5 3.1 需求分析 5 3.2 系统可行性分析 5 3.2.1技术可行性…

基于YOLO算法的单目相机2D测量(工件尺寸和物体尺寸)

1.简介 1.1 2D测量技术 基于单目相机的2D测量技术在许多领域中具有重要的背景和意义。 工业制造&#xff1a;在工业制造过程中&#xff0c;精确测量是确保产品质量和一致性的关键。基于单目相机的2D测量技术可以用于检测和测量零件尺寸、位置、形状等参数&#xff0c;进而实…

【网络安全】如何保护IP地址?

使用防火墙是保护IP地址的一个重要手段。防火墙可以监控和过滤网络流量&#xff0c;并阻止未经授权的访问。一家网络安全公司的研究显示&#xff0c;超过80%的企业已经部署了防火墙来保护他们的网络和IP地址。 除了防火墙&#xff0c;定期更新操作系统和应用程序也是保护IP地址…

学信息系统项目管理师第4版系列24_整合管理

1. PMBOK 1.1. 自1987年以来&#xff0c;PMBOK-直是基于过程的项目管理标准的重要代表 1.1.1. 基于过程的方法是项目管理的基石 1.2. 从2021年开始&#xff0c;第7版PMBOK采用了基于原则的标准&#xff0c;其中包含了 12个项目管理基本原则&#xff0c;这些基本原则为有效的…

Python数据攻略-递归方式实现json多层级数据展平

之前介绍过使用pandas如何展平json的多层数据,如果有兴趣可以参考文章 Python数据攻略-Pandas的json_normalize方法 Python数据攻略-递归方式实现json多层级数据展平 今天再介绍一个非常好用的使用递归的方式展平json数据。 文章目录 数据处理目标操作示例核心函数方法操作…

硬件基本功--过流、过压保护电路

1.简介 过流保护(OCP)&#xff1a;当电路电流超过预定最大值时&#xff0c;使保护装置动作的一种保护方式。不允许超过预定最大值电流&#xff0c;不然会烧坏电路的器件。过压保护(OVP)&#xff1a;被保护电路电压超过预定的最大值时&#xff0c;使电源断开或使受控设备电压降低…