十一、Nacos源码系列:Nacos配置中心原理(三)- 配置热更新

上面章节我们讲了服务启动的时候从远程 Nacos 服务端拉到配置,以及服务启动后对需要支持热更新的配置都注册了一个监听器,这个章节我们来说下配置变动后具体是怎么处理的。

回到前面文章说过的 NacosPropertySourceLocator 的 locate()方法看看,该方法首先会通过NacosConfigManager获取一个 ConfigService,我们看下ConfigService是如何创建的:

public ConfigService getConfigService() {if (Objects.isNull(service)) {createConfigService(this.nacosConfigProperties);}return service;
}static ConfigService createConfigService(NacosConfigProperties nacosConfigProperties) {if (Objects.isNull(service)) {synchronized (NacosConfigManager.class) {try {if (Objects.isNull(service)) {service = NacosFactory.createConfigService(nacosConfigProperties.assembleConfigServiceProperties());}}catch (NacosException e) {log.error(e.getMessage());throw new NacosConnectionFailureException(nacosConfigProperties.getServerAddr(), e.getMessage(), e);}}}return service;
}// com.alibaba.nacos.api.config.ConfigFactory#createConfigService(java.util.Properties)
public static ConfigService createConfigService(Properties properties) throws NacosException {try {Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");Constructor constructor = driverImplClass.getConstructor(Properties.class);ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);return vendorImpl;} catch (Throwable e) {throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);}
}

从源码可以看到,NacosConfigManager 中会进行一个 ConfigService 单例对象的创建,创建流程最终会委托给 ConfigFactory,使用反射方式创建一个 NacosConfigService 的实例对象,NacosConfigService 是一个很核心的类,配置的获取,监听器的注册都需要经此。

我们看下 NacosConfigService 的构造函数,会去创建一个 ClientWorker 类的对象,这个类是实现配置热更新的核心类。

/*** 长轮训*/
private final ClientWorker worker;public NacosConfigService(Properties properties) throws NacosException {final NacosClientProperties clientProperties = NacosClientProperties.PROTOTYPE.derive(properties);ValidatorUtils.checkInitParam(clientProperties);// 初始化NamespaceinitNamespace(clientProperties);// 创建了一个配置过滤器链,可以采用SPI扩展机制加载对应的过滤器实现类this.configFilterChainManager = new ConfigFilterChainManager(clientProperties.asProperties());// 创建了一个服务管理器,内含一个定时轮询线程池,每隔30s拉取一次服务ServerListManager serverListManager = new ServerListManager(clientProperties);serverListManager.start();// 创建了一个客户端工作者,包含了一个代理对象this.worker = new ClientWorker(this.configFilterChainManager, serverListManager, clientProperties);// will be deleted in 2.0 later versionsagent = new ServerHttpAgent(serverListManager);}

再来看下ClientWorker的构造方法:

public ClientWorker(final ConfigFilterChainManager configFilterChainManager, ServerListManager serverListManager,final NacosClientProperties properties) throws NacosException {this.configFilterChainManager = configFilterChainManager;// 初始化timeout、taskPenaltyTime、enableRemoteSyncConfig属性init(properties);// 创建一个用于配置服务端的Rpc通信客户端agent = new ConfigRpcTransportClient(properties, serverListManager);// 计算合适的核心线程数int count = ThreadUtils.getSuitableThreadCount(THREAD_MULTIPLE);// 初始化一个线程池ScheduledExecutorService executorService = Executors.newScheduledThreadPool(Math.max(count, MIN_THREAD_NUM),r -> {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.Worker");t.setDaemon(true);return t;});// 配置了一个异步处理线程池agent.setExecutor(executorService);// 调用start方法agent.start();}

首先创建一个用于配置服务端的Rpc通信客户端,然后初始化了一个线程池,配置到rpc客户端中,调用ConfigRpcTransportClient#start()启动:

public void start() throws NacosException {// 认证服务,主要是通过Secret Key和Access Key做认证用securityProxy.login(this.properties);// 每隔5s执行一次认证this.executor.scheduleWithFixedDelay(() -> securityProxy.login(properties), 0,this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);// 内部启动startInternal();
}

 然后启动worker:

/*** 阻塞队列*/
private final BlockingQueue<Object> listenExecutebell = new ArrayBlockingQueue<>(1);public void startInternal() {// 线程池在阻塞等到信号的到来executor.schedule(() -> {while (!executor.isShutdown() && !executor.isTerminated()) {try {// 获取到listenExecutebell.offer(bellItem)的信号// 如果没有监听器的变动,则等待5s处理一次listenExecutebell.poll(5L, TimeUnit.SECONDS);if (executor.isShutdown() || executor.isTerminated()) {continue;}// 执行配置监听executeConfigListen();} catch (Throwable e) {LOGGER.error("[rpc listen execute] [rpc listen] exception", e);try {Thread.sleep(50L);} catch (InterruptedException interruptedException) {//ignore}notifyListenConfig();}}}, 0L, TimeUnit.MILLISECONDS);}

 核心代码就是executeConfigListen()执行配置监听:

public void executeConfigListen() {// 存放含有listen的cacheDataMap<String, List<CacheData>> listenCachesMap = new HashMap<>(16);// 存放不含有listen的cacheDataMap<String, List<CacheData>> removeListenCachesMap = new HashMap<>(16);long now = System.currentTimeMillis();// 当前时间 减去 上一次全量同步的时间,如果大于3分钟,表示到了全量同步的时间了boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL;for (CacheData cache : cacheMap.get().values()) {synchronized (cache) {// 是否和服务端一致if (cache.isConsistentWithServer()) {// 一致则检查md5值,若md5值和上一个不一样,则说明变动了,需要通知监听器cache.checkListenerMd5();// 是否到全量同步时间了,未到则直接跳过if (!needAllSync) {continue;}}if (!cache.isDiscard()) {// 非丢弃型,即新增,放入listenCachesMap//get listen  configif (!cache.isUseLocalConfigInfo()) {List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId()));if (cacheDatas == null) {cacheDatas = new LinkedList<>();listenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);}cacheDatas.add(cache);}} else if (cache.isDiscard() && CollectionUtils.isEmpty(cache.getListeners())) {// 丢弃型,即删除, 放入removeListenCachesMapif (!cache.isUseLocalConfigInfo()) {List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId()));if (cacheDatas == null) {cacheDatas = new LinkedList<>();removeListenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);}cacheDatas.add(cache);}}}}// 如果需要和服务端数据同步,则listenCachesMap和removeListenCachesMap存放了本地数据,需要和服务端对比boolean hasChangedKeys = checkListenCache(listenCachesMap);//execute check remove listen.checkRemoveListenCache(removeListenCachesMap);if (needAllSync) {// 更新同步时间lastAllSyncTime = now;}//If has changed keys,notify re sync md5.if (hasChangedKeys) {// 服务端告知了有数据变动,则需要再同步一次notifyListenConfig();}}

主要做了两件事情:

  • 1)、首先遍历之前缓存到cacheMap的所有CacheData,判断是否和服务端保持一致,如果一致,则检查配置的md5值,如果md5值和上一个不一样,则说明变动了,需要通知监听器。 
// cacheData的校验方法
void checkListenerMd5() {// 遍历这个配置所有的监听者for (ManagerListenerWrap wrap : listeners) {if (!md5.equals(wrap.lastCallMd5)) {// 如果内容变动了,直接通知监听器处理,并且更新 listenerWrap 中的 content、Md5safeNotifyListener(dataId, group, content, type, md5, encryptedDataKey, wrap);}}
}

遍历这个配置对应的所有的监听者,判断内容是否变动,如果变动了,调用safeNotifyListener()直接通知监听器处理。

private void safeNotifyListener(final String dataId, final String group, final String content, final String type,final String md5, final String encryptedDataKey, final ManagerListenerWrap listenerWrap) {// 获取到监听器final Listener listener = listenerWrap.listener;if (listenerWrap.inNotifying) {LOGGER.warn("[{}] [notify-currentSkip] dataId={}, group={},tenant={}, md5={}, listener={}, listener is not finish yet,will try next time.",envName, dataId, group, tenant, md5, listener);return;}// 定义一个通知任务NotifyTask job = new NotifyTask() {@Overridepublic void run() {long start = System.currentTimeMillis();ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();ClassLoader appClassLoader = listener.getClass().getClassLoader();ScheduledFuture<?> timeSchedule = null;try {if (listener instanceof AbstractSharedListener) {AbstractSharedListener adapter = (AbstractSharedListener) listener;adapter.fillContext(dataId, group);LOGGER.info("[{}] [notify-context] dataId={}, group={},tenant={}, md5={}", envName, dataId,group, tenant, md5);}// Before executing the callback, set the thread classloader to the classloader of// the specific webapp to avoid exceptions or misuses when calling the spi interface in// the callback method (this problem occurs only in multi-application deployment).Thread.currentThread().setContextClassLoader(appClassLoader);ConfigResponse cr = new ConfigResponse();cr.setDataId(dataId);cr.setGroup(group);cr.setContent(content);cr.setEncryptedDataKey(encryptedDataKey);configFilterChainManager.doFilter(null, cr);String contentTmp = cr.getContent();timeSchedule = getNotifyBlockMonitor().schedule(new LongNotifyHandler(listener.getClass().getSimpleName(), dataId, group, tenant, md5,notifyWarnTimeout, Thread.currentThread()), notifyWarnTimeout,TimeUnit.MILLISECONDS);listenerWrap.inNotifying = true;// 回调通知,也就是通知变动的内容// 这里就是执行前面说到的注册监听时的一个回调函数,里面其实最主要的就是发布了一个RefreshEvent事件,springcloud会处理这个事件listener.receiveConfigInfo(contentTmp);// compare lastContent and contentif (listener instanceof AbstractConfigChangeListener) {// 扩展点,告知配置内容的变动Map<String, ConfigChangeItem> data = ConfigChangeHandler.getInstance().parseChangeData(listenerWrap.lastContent, contentTmp, type);ConfigChangeEvent event = new ConfigChangeEvent(data);((AbstractConfigChangeListener) listener).receiveConfigChange(event);listenerWrap.lastContent = contentTmp;}// 赋值最新的md5listenerWrap.lastCallMd5 = md5;LOGGER.info("[{}] [notify-ok] dataId={}, group={},tenant={}, md5={}, listener={} ,job run cost={} millis.",envName, dataId, group, tenant, md5, listener, (System.currentTimeMillis() - start));} catch (NacosException ex) {LOGGER.error("[{}] [notify-error] dataId={}, group={},tenant={},md5={}, listener={} errCode={} errMsg={},stackTrace :{}",envName, dataId, group, tenant, md5, listener, ex.getErrCode(), ex.getErrMsg(),getTrace(ex.getStackTrace(), 3));} catch (Throwable t) {LOGGER.error("[{}] [notify-error] dataId={}, group={},tenant={}, md5={}, listener={} tx={}",envName, dataId, group, tenant, md5, listener, getTrace(t.getStackTrace(), 3));} finally {listenerWrap.inNotifying = false;Thread.currentThread().setContextClassLoader(myClassLoader);if (timeSchedule != null) {timeSchedule.cancel(true);}}}};try {// 监听器配置了异步执行器,就异步执行if (null != listener.getExecutor()) {LOGGER.info("[{}] [notify-listener] task submitted to user executor, dataId={}, group={},tenant={}, md5={}, listener={} ",envName, dataId, group, tenant, md5, listener);job.async = true;listener.getExecutor().execute(job);} else {// 同步执行LOGGER.info("[{}] [notify-listener] task execute in nacos thread, dataId={}, group={},tenant={}, md5={}, listener={} ",envName, dataId, group, tenant, md5, listener);job.run();}} catch (Throwable t) {LOGGER.error("[{}] [notify-listener-error] dataId={}, group={},tenant={}, md5={}, listener={} throwable={}",envName, dataId, group, tenant, md5, listener, t.getCause());}
}

在safeNotifyListener方法中,创建了一个通知任务NotifyTask,NotifyTask实现了runnable接口,需要关注其run方法的实现。然后看监听器是否配置了异步执行线程池,如果配置了,就异步执行;否则就是同步执行。

我们来看看NotifyTask的run方法做了哪些事情。

先是创建了一个ConfigResponse对象,封装了namespaceId、groupId、dataId。接着,也是最重要的,就是回调监听器的receiveConfigInfo()方法,然后更新监听器包装器中的 lastContent、lastCallMd5字段。

那么receiveConfigInfo()方法到底做了什么事情呢?我们不妨来看看之前注册监听器的代码:

// com.alibaba.cloud.nacos.refresh.NacosContextRefresher#registerNacosListener
private void registerNacosListener(final String groupKey, final String dataKey) {// key = {dataId,group}String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);Listener listener = listenerMap.computeIfAbsent(key,lst -> new AbstractSharedListener() {@Overridepublic void innerReceive(String dataId, String group,String configInfo) {// 累加配置刷新次数refreshCountIncrement();// 添加一条刷新记录nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);// 通过Spring上下文发布一个RefreshEvent刷新事件applicationContext.publishEvent(new RefreshEvent(this, null, "Refresh Nacos config"));if (log.isDebugEnabled()) {log.debug(String.format("Refresh Nacos config group=%s,dataId=%s,configInfo=%s",group, dataId, configInfo));}}});
try {// 注册配置监听器,以 dataId + groupId + namespace 为维度进行注册的configService.addListener(dataKey, groupKey, listener);
}
catch (NacosException e) {log.warn(String.format("register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey,groupKey), e);
}
}

所以,receiveConfigInfo()方法最终执行的就是AbstractSharedListener#innerReceive()方法,主要是发布了一个RefreshEvent事件,RefreshEvent 事件主要由 SpringCloud 相关类来处理。

  • 2)、 服务端告知了有数据变动,则需要再同步一次

其实就是往阻塞队列中存放内容,再来一遍前面的同步流程。

public void notifyListenConfig() {// listenExecutebell是一个阻塞队列,放入bellItem,即一个触发条件,相当于生产者listenExecutebell.offer(bellItem);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/691398.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

浅谈前端性能优化的方法

前端性能优化是一个多方面的过程&#xff0c;涉及减少加载时间、提高响应速度、优化渲染等方面。以下是一些常见的前端性能优化方法&#xff1a; 减少HTTP请求&#xff1a;合并CSS和JavaScript文件&#xff0c;使用CSS Sprite技术&#xff0c;以及使用HTTP2.0等协议来减少HTTP…

51-2 万字长文,深度解读端到端自动驾驶的挑战和前沿

去年初&#xff0c;我曾打算撰写一篇关于端到端自动驾驶的文章&#xff0c;发现大模型在自动驾驶领域的尝试案例并不多。遂把议题扩散了一点&#xff0c;即从大模型开始&#xff0c;逐渐向自动驾驶垂直领域靠近&#xff0c;最后落地到端到端。这样需要阐述的内容就变成LLM基础模…

【Docker】集群容器监控和统计 Portainer基本用法

Portainer是一款轻量级的应用&#xff0c;它提供了图形化界面&#xff0c;用川于方便地管理Docker环境&#xff0c;包括单机环境和集群环境。 主要功能&#xff1a;实现集群容器的监控和统计 下载安装 官网&#xff1a;https://www.portainer.io 文档&#xff1a;https://do…

如何用java来调用FileMaker data Api 新增数据的代码

Claris Filemaker目前在国内的使用用户并不是很多&#xff0c;但是非常适合我们目前的实验室智能化研发系统&#xff0c;今天也给大家做一个小小的技术分享。&#xff08;看最后注释部分&#xff0c;有彩蛋哦&#xff09; 要使用Java调用FileMaker Data API来新增数据&#xf…

Python Selenium实现自动化测试及Chrome驱动使用

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站零基础入门的AI学习网站~。 目录 ​编辑 前言 Selenium简介 安装Selenium库 编写自动化测试脚本 1 打开浏览器并访问网页 2 查找页面元…

前端架构: 脚手架框架之commander从基础到高级应用教程

commander 1 &#xff09;概述 commander 是一个更为知名的脚手架框架进入它的npm官网: https://www.npmjs.com/package/commander目前版本: 12.0.0Weekly Downloads 133,822,797 (动态数据)最近更新&#xff1a;15 days ago (npm)说明这是一个更优质的库同时使用commander的案…

Docker Desktop 链接windos 安装的redis和mysql

1.1.先在容器安装项目 2.链接redis和mysql配置 redis和mysql是在windos安装的&#xff0c;使用的是小p管理器安装的 项目链接 DB_DRIVERmysql DB_HOSThost.docker.internal DB_PORT3306 DB_DATABASEyunxc_test DB_USERNAMEyunxc_test DB_PASSWORDtest123456... DB_CHARSETutf…

Python中*args 和**kwargs

当函数的参数不确定时&#xff0c;可以使用*args 和**kwargs&#xff0c;*args 没有key值&#xff0c;**kwargs有key值。 *args [python] def fun_var_args(farg, *args): print "arg:", farg for value in args: print "another arg:", value fun_var_a…

Day10-Linux系统打包和时间命令及案例精讲

Day10-Linux系统打包和时间命令及案例精讲 1. tar 打包压缩1.1 【打包】 为什么要打包&#xff0c;压缩&#xff1f;1.2 【查看包里内容】1.3 【解包】1.4 排除打包 --exclude 2. date 时间命令 1. tar 打包压缩 1.1 【打包】 为什么要打包&#xff0c;压缩&#xff1f; -zcv…

一周学会Django5 Python Web开发-Django5路由变量

锋哥原创的Python Web开发 Django5视频教程&#xff1a; 2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~共计22条视频&#xff0c;包括&#xff1a;2024版 Django5 Python we…

SPSSAU【文本分析】|文本聚类

SPSSAU共提供两种文本聚类方式&#xff0c;分别是按词聚类和按行聚类。按词聚类是指将需要分析的关键词进行聚类分析&#xff0c;并且进行可视化展示&#xff0c;即针对关键词进行聚类&#xff0c;此处关键词可以自由选择。按行聚类分析是指针对以‘行’为单位进行聚类分析&…

YOLOv8推理程序

YOLOv8单独推理,有时候我们自定义的模块算子无法正常转换为其他框架,而且需要做成应用,因此需要一个单独推理的程序,返回的是识别后的照片还有各个类别及其对应数量。文章最后给出Flask封装为Server以及调用的实例还有Client请求代码,支持几十路多线程并发,只需加载一次模…

闲鱼搜索API接口

闲鱼搜索API接口接口api代码对接如下&#xff1a; 1.公共参数 名称 类型 必须 描述 key String √ get请求方式拼接在url中&#xff0c;点击获取 api_name String √ api接口名称 cache String 默认否 result_type String 否 json lang String 默认cn 简…

linux部署jenkins,支持jdk1.8

无废话&#xff0c;纯干活安装指令 本文前提条件需安装jdk8&#xff0c;安装参考&#xff1a;Linux配置jdk环境 下载资源 # 创建安装目录 mkdir -p /data/jenkins && cd /data/jenkins# 下载jenkins的war包&#xff0c;v2.346.x支持jdk1.8&#xff0c;高于这个版本的…

【数据结构】图的最小生成树

最小生成树 一个图中有N个顶点&#xff0c;边的数量一定是>N-1&#xff0c;我们从中选取N-1条边&#xff0c;用来连接N个点&#xff0c;所形成的边权之和最小&#xff0c;就是最小生成树。 构成最小生成树的准则 只能使用图中的边来构造最小生成树只能使用恰好n-1条边来连…

Stable Diffusion 绘画入门教程(webui)-提示词

通过上一篇文章大家应该已经掌握了sd的使用流程&#xff0c;本篇文章重点讲一下提示词应该如何写 AI绘画本身就是通过我们写一些提示词&#xff0c;然后生成对应的画面&#xff0c;所以提示词的重要性不言而喻。 要想生成更加符合自己脑海里画面的图片&#xff0c;就尽量按照…

术业有专攻!三防加固平板助力工业起飞

在日常使用中的商业电脑比较追求时效性&#xff0c;以市场定位做标准&#xff0c;内部元件只需满足一般要求就行&#xff0c;使用寿命比较短。而三防平板电脑是主要运用在复杂、恶劣的环境下所以在需求方面较高,需要保证产品在恶劣条件下正常使用&#xff0c;满足行业领域的需求…

【CCEdit】通过扩散模型进行创意且可控的视频编辑

文章目录 CCEdit1. 核心特性1.1 三叉戟网络结构1.2 精细的外观控制1.3 高度的自适应性 2. 三叉戟结构2.1 结构分支&#xff08;ControlNet架构&#xff09;2.2 外观分支2.3 主分支 3. 数据集——BalanceCC benchmark dataset4. 训练5. 长视频编辑6. 使用场景7. 评估指标 CCEdit…

单片机01天---stm32基本信息了解

下载数据手册 以STM32F407ZG为例 网站&#xff1a;www.st.com&#xff0c;搜索芯片型号&#xff0c;下载“数据手册”使用 数据手册使用 查看芯片型号信息 芯片资源信息 时钟框图 芯片资源表格下方 GPIO口表格 一般位于下图后面的位置 ①工作电压&#xff1a;1.8V – 3.6V…

Codeforces Round 928 (Div. 4) (A-E)

比赛地址 : https://codeforces.com/contest/1926 A 遍历每一个字符串&#xff0c;比较1和0的数量即可&#xff0c;那个大输出那个; #include<bits/stdc.h> #define IOS ios::sync_with_stdio(0);cin.tie(0);cout.tie(0); #define endl \n #define lowbit(x) (x&am…