Nacos2.X 配置中心源码分析:客户端如何拉取配置、服务端配置发布客户端监听机制

文章目录

  • Nacos配置中心源码
    • 总流程图
    • NacosClient源码分析
      • 获取配置
      • 注册监听器
    • NacosServer源码分析
      • 配置dump
      • 配置发布

Nacos配置中心源码

总流程图

Nacos2.1.0源码分析在线流程图

在这里插入图片描述

源码的版本为2.1.0 ,并在配置了下面两个启动参数,一个表示单机启动,一个是指定nacos的工作目录,其中会存放各种运行文件方便查看

-Dnacos.standalone=true
-Dnacos.home=D:\nacos-cluster\nacos2.1.0standalone



NacosClient源码分析

在NacosClient端服务注册中心核心的接口是NamingService,而配置中心核心的接口是ConfigService

我们可以添加一个配置,然后查看这里的实例代码

在这里插入图片描述

/*
* Demo for Nacos
* pom.xml<dependency><groupId>com.alibaba.nacos</groupId><artifactId>nacos-client</artifactId><version>${version}</version></dependency>
*/
package com.alibaba.nacos.example;import java.util.Properties;
import java.util.concurrent.Executor;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;/*** Config service example** @author Nacos**/
public class ConfigExample {public static void main(String[] args) throws NacosException, InterruptedException {String serverAddr = "localhost";String dataId = "nacos-config-demo.yaml";String group = "DEFAULT_GROUP";Properties properties = new Properties();properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);//获取配置服务ConfigService configService = NacosFactory.createConfigService(properties);//获取配置String content = configService.getConfig(dataId, group, 5000);System.out.println(content);//注册监听器configService.addListener(dataId, group, new Listener() {@Overridepublic void receiveConfigInfo(String configInfo) {System.out.println("recieve:" + configInfo);}@Overridepublic Executor getExecutor() {return null;}});//发布配置//boolean isPublishOk = configService.publishConfig(dataId, group, "content");//System.out.println(isPublishOk);//发送properties格式configService.publishConfig(dataId,group,"common.age=30", ConfigType.PROPERTIES.getType());Thread.sleep(3000);content = configService.getConfig(dataId, group, 5000);System.out.println(content);/*		boolean isRemoveOk = configService.removeConfig(dataId, group);System.out.println(isRemoveOk);Thread.sleep(3000);content = configService.getConfig(dataId, group, 5000);System.out.println(content);Thread.sleep(300000);*/}
}



获取配置

总结:

获取配置的主要方法是 NacosConfigService 类的 getConfig 方法,通常情况下该方法直接从本地文件中取得配置的值,如果本地文件不存在或者内容为空,则再通过grpc从远端拉取配置,并保存到本地快照中。

NacosServer端的处理是从磁盘读取配置文件./nacosHome/data/config-data/DEFAULT_GROUP/dataId,然后将读取到的content返回

在这里插入图片描述



接下来的源码就是这里一块的流程,它是如何调用到NacosConfigService 类的 getConfig ()方法

public interface ConfigService {String getConfig(String dataId, String group, long timeoutMs) throws NacosException;......
}

在这里插入图片描述


还是一样,从spring.factiries文件中起步

在这里插入图片描述



进入到NacosConfigBootstrapConfiguration自动配置类的,这其中会创建一个NacosPropertySourceLocatorbean对象

@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = {"spring.cloud.nacos.config.enabled"},matchIfMissing = true
)
public class NacosConfigBootstrapConfiguration {public NacosConfigBootstrapConfiguration() {}@Bean@ConditionalOnMissingBeanpublic NacosConfigProperties nacosConfigProperties() {return new NacosConfigProperties();}@Bean@ConditionalOnMissingBeanpublic NacosConfigManager nacosConfigManager(NacosConfigProperties nacosConfigProperties) {return new NacosConfigManager(nacosConfigProperties);}// 核心bean@Beanpublic NacosPropertySourceLocator nacosPropertySourceLocator(NacosConfigManager nacosConfigManager) {return new NacosPropertySourceLocator(nacosConfigManager);}@Bean@ConditionalOnMissingBean(search = SearchStrategy.CURRENT)@ConditionalOnNonDefaultBehaviorpublic ConfigurationPropertiesRebinder smartConfigurationPropertiesRebinder(ConfigurationPropertiesBeans beans) {return new SmartConfigurationPropertiesRebinder(beans);}
}




NacosPropertySourceLocator这个bean中,它的接口中的默认方法会调用locate()方法:

  • 加载共享配置文件,也就是shared-configs配置项指定的数组

  • 加载加载扩展的配置文件,也就是extension-configs配置项指定的数组

  • 加载和应用名相关的几个默认配置文件,比如order-service-dev.yml

  • 上面三个方法中都会各自调用到loadNacosDataIfPresent() --> loadNacosPropertySource(...) --> NacosPropertySourceBuilder.build()

public class NacosPropertySourceLocator implements PropertySourceLocator {public PropertySource<?> locate(Environment env) {this.nacosConfigProperties.setEnvironment(env);// 获取配置中心服务ConfigServiceConfigService configService = this.nacosConfigManager.getConfigService();if (null == configService) {log.warn("no instance of config service found, can't load config from nacos");return null;} else {long timeout = (long)this.nacosConfigProperties.getTimeout();this.nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService, timeout);String name = this.nacosConfigProperties.getName();String dataIdPrefix = this.nacosConfigProperties.getPrefix();if (StringUtils.isEmpty(dataIdPrefix)) {dataIdPrefix = name;}if (StringUtils.isEmpty(dataIdPrefix)) {dataIdPrefix = env.getProperty("spring.application.name");}CompositePropertySource composite = new CompositePropertySource("NACOS");// 加载共享配置文件this.loadSharedConfiguration(composite);// 加载扩展的配置文件this.loadExtConfiguration(composite);// 加载当前应用配置文件// 在该方法中会进行下面三行的逻辑/*dataIdPrefixdataIdPrefix + "." + fileExtensiondataIdPrefix + "-" + profile + "." + fileExtension*/this.loadApplicationConfiguration(composite, dataIdPrefix, this.nacosConfigProperties, env);return composite;}}// 可以详细看一下,NacosClient启动时,是怎么根据微服务名去取配置文件的private void loadApplicationConfiguration() {String fileExtension = properties.getFileExtension();String nacosGroup = properties.getGroup();// 最先使用微服务名 调用下面的loadNacosDataIfPresent()方法this.loadNacosDataIfPresent(compositePropertySource, dataIdPrefix, nacosGroup, fileExtension, true);// 接下来是使用微服务名+文件后缀名的方式 调用下面的loadNacosDataIfPresent()方法this.loadNacosDataIfPresent(compositePropertySource, dataIdPrefix + "." + fileExtension, nacosGroup, fileExtension, true);String[] var7 = environment.getActiveProfiles();int var8 = var7.length;for(int var9 = 0; var9 < var8; ++var9) {String profile = var7[var9];// 第三次使用 微服务名+profile + 文件后缀名 调用下面的loadNacosDataIfPresent()方法String dataId = dataIdPrefix + "-" + profile + "." + fileExtension;this.loadNacosDataIfPresent(compositePropertySource, dataId, nacosGroup, fileExtension, true);}}// 在上面三个加载共享、扩展、当前应用名方法中,最终都会调用到下面的loadNacosDataIfPresent(...) 方法中private void loadNacosDataIfPresent(...) {if (null != dataId && dataId.trim().length() >= 1) {if (null != group && group.trim().length() >= 1) {// 调用loadNacosPropertySource()方法NacosPropertySource propertySource = this.loadNacosPropertySource(dataId, group, fileExtension, isRefreshable);this.addFirstPropertySource(composite, propertySource, false);}}}// loadNacosDataIfPresent(...) ---> loadNacosPropertySource(...)private NacosPropertySource loadNacosPropertySource(...) {return NacosContextRefresher.getRefreshCount() != 0L && !isRefreshable ? 	NacosPropertySourceRepository.getNacosPropertySource(dataId, group) : // 这里会进入到NacosPropertySourceBuilder类的build方法this.nacosPropertySourceBuilder.build(dataId, group, fileExtension, isRefreshable);}}




NacosPropertySourceBuilder类的代码调用流程:

  • 这里就会调用到核心接口configService接口实现类的getConfig()方法
public class NacosPropertySourceBuilder {......NacosPropertySource build(String dataId, String group, String fileExtension, boolean isRefreshable) {// 这里会先调用loadNacosData()方法List<PropertySource<?>> propertySources = this.loadNacosData(dataId, group, fileExtension);NacosPropertySource nacosPropertySource = new NacosPropertySource(propertySources, group, dataId, new Date(), isRefreshable);NacosPropertySourceRepository.collectNacosPropertySource(nacosPropertySource);return nacosPropertySource;}// build(...) ---> loadNacosData(...)private List<PropertySource<?>> loadNacosData(String dataId, String group, String fileExtension) {String data = null;try {// 这里进入到了configService接口实现类的getConfig()方法data = this.configService.getConfig(dataId, group, this.timeout);if (StringUtils.isEmpty(data)) {log.warn("Ignore the empty nacos configuration and get it based on dataId[{}] & group[{}]", dataId, group);return Collections.emptyList();}if (log.isDebugEnabled()) {log.debug(String.format("Loading nacos data, dataId: '%s', group: '%s', data: %s", dataId, group, data));}return NacosDataParserHandler.getInstance().parseNacosData(dataId, data, fileExtension);} catch (NacosException var6) {log.error("get data from Nacos error,dataId:{} ", dataId, var6);} catch (Exception var7) {log.error("parse data from Nacos error,dataId:{},data:{}", new Object[]{dataId, data, var7});}return Collections.emptyList();}}



核心方法,NacosClient向NacosServer发送请求,拉取配置的方法。

前面的调用栈如果不会,可以直接在下面getConfig()方法出打一个断点,然后从debug中看调用栈。方法具体的实现:

  • NacosClient端,这里首先会读取本地文件,本地是有一个缓存的
  • 如果本地缓存中没有我们需要的配置,那么就需要从NacosServer端拉取配置了
  • 发送请求,获取响应数据
  • 将数据在本地文件中缓存一份
// 只是方法调用
public String getConfig(String dataId, String group, long timeoutMs) throws NacosException {return getConfigInner(namespace, dataId, group, timeoutMs);
}private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {group = blank2defaultGroup(group);ParamUtils.checkKeyParam(dataId, group);ConfigResponse cr = new ConfigResponse();cr.setDataId(dataId);cr.setTenant(tenant);cr.setGroup(group);// use local config first// NacosClient端,这里首先会读取本地文件,本地是有一个缓存的String content = LocalConfigInfoProcessor.getFailover(worker.getAgentName(), dataId, group, tenant);if (content != null) {LOGGER.warn(..);cr.setContent(content);String encryptedDataKey = LocalEncryptedDataKeyProcessor.getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);cr.setEncryptedDataKey(encryptedDataKey);configFilterChainManager.doFilter(null, cr);content = cr.getContent();return content;}// 如果本地缓存中没有我们需要的配置,那么就需要从NacosServer端拉取配置了try {// 从服务端获取配置ConfigResponse response = worker.getServerConfig(dataId, group, tenant, timeoutMs, false);cr.setContent(response.getContent());cr.setEncryptedDataKey(response.getEncryptedDataKey());configFilterChainManager.doFilter(null, cr);content = cr.getContent();return content;} catch (NacosException ioe) {if (NacosException.NO_RIGHT == ioe.getErrCode()) {throw ioe;}LOGGER.warn(..);}LOGGER.warn(..);// 再从本地文件缓存中找content = LocalConfigInfoProcessor.getSnapshot(worker.getAgentName(), dataId, group, tenant);cr.setContent(content);String encryptedDataKey = LocalEncryptedDataKeyProcessor.getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);cr.setEncryptedDataKey(encryptedDataKey);configFilterChainManager.doFilter(null, cr);content = cr.getContent();return content;
}//--------------------------
public ConfigResponse getServerConfig(String dataId, String group, String tenant, long readTimeout, boolean notify)throws NacosException {// 默认组名if (StringUtils.isBlank(group)) {group = Constants.DEFAULT_GROUP;}// 从服务端查询配置return this.agent.queryConfig(dataId, group, tenant, readTimeout, notify);
}// 向NacosServer发送请求,拉取配置数据,并在本地文件中缓存一份
public ConfigResponse queryConfig(String dataId, String group, String tenant, long readTimeouts, boolean notify)throws NacosException {ConfigQueryRequest request = ConfigQueryRequest.build(dataId, group, tenant);request.putHeader(NOTIFY_HEADER, String.valueOf(notify));RpcClient rpcClient = getOneRunningClient();if (notify) {CacheData cacheData = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));if (cacheData != null) {rpcClient = ensureRpcClient(String.valueOf(cacheData.getTaskId()));}}// 发送请求,获取响应数据ConfigQueryResponse response = (ConfigQueryResponse) requestProxy(rpcClient, request, readTimeouts);ConfigResponse configResponse = new ConfigResponse();if (response.isSuccess()) {// 将数据在本地文件中缓存一份LocalConfigInfoProcessor.saveSnapshot(this.getName(), dataId, group, tenant, response.getContent());configResponse.setContent(response.getContent());String configType;if (StringUtils.isNotBlank(response.getContentType())) {configType = response.getContentType();} else {configType = ConfigType.TEXT.getType();}configResponse.setConfigType(configType);String encryptedDataKey = response.getEncryptedDataKey();LocalEncryptedDataKeyProcessor.saveEncryptDataKeySnapshot(agent.getName(), dataId, group, tenant, encryptedDataKey);configResponse.setEncryptedDataKey(encryptedDataKey);return configResponse;} else if (response.getErrorCode() == ConfigQueryResponse.CONFIG_NOT_FOUND) {LocalConfigInfoProcessor.saveSnapshot(this.getName(), dataId, group, tenant, null);LocalEncryptedDataKeyProcessor.saveEncryptDataKeySnapshot(agent.getName(), dataId, group, tenant, null);return configResponse;} else if (response.getErrorCode() == ConfigQueryResponse.CONFIG_QUERY_CONFLICT) {...}
}



注册监听器

结论:

  • 从spring.facotries文件中开始,其中一个bean会监听spring容器启动完成的事件
  • 然后它会为当前应用添加监听器:遍历每个dataId,添加监听器。
  • 当nacosServer端更改了配置,这里监听器中的方法就会运行,这里都会发布一个RefreshEvent事件
  • 处理RefreshEvent事件的方法中会
    • 刷新环境变量
    • 销毁@RefreshScope注解修改的bean实例




NacosServer端如果修改了配置,就会发布一个事件,而在NacosClient端这边就会有一个EventListener去监听该事件并进行相应的处理。

ConfigService接口中,有三个和监听器相关的方法

public interface ConfigService {String getConfigAndSignListener(String dataId, String group, long timeoutMs, Listener listener)throws NacosException;void addListener(String dataId, String group, Listener listener) throws NacosException;void removeListener(String dataId, String group, Listener listener);}

在这里插入图片描述



接下来进入源码中,入口是NacosConfigAutoConfiguration自动配置的NacosContextRefresherbean 对象

@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = {"spring.cloud.nacos.config.enabled"},matchIfMissing = true
)
public class NacosConfigAutoConfiguration {...@Beanpublic NacosContextRefresher nacosContextRefresher(NacosConfigManager nacosConfigManager, NacosRefreshHistory nacosRefreshHistory) {return new NacosContextRefresher(nacosConfigManager, nacosRefreshHistory);}...
}

该类它监听了ApplicationReadyEvent事件,

  • 在spring容器启动完成后就会调用该类的onApplicationEvent()方法

  • 给当前应用注册nacos监听器

  • 为每个 dataId注册监听器

  • 当某个dataId发生了更改,这里都会发布一个RefreshEvent事件

public class NacosContextRefresher implements ApplicationListener<ApplicationReadyEvent> , ApplicationContextAware {// 在spring容器启动完成后就会调用该类的onApplicationEvent()方法public void onApplicationEvent(ApplicationReadyEvent event) {if (this.ready.compareAndSet(false, true)) {// 给当前应用注册nacos监听器this.registerNacosListenersForApplications();}}// 给当前应用注册nacos监听器private void registerNacosListenersForApplications() {// 是否刷新配置,默认为trueif (this.isRefreshEnabled()) {Iterator var1 = NacosPropertySourceRepository.getAll().iterator();// 遍历每个dataIdwhile(var1.hasNext()) {NacosPropertySource propertySource = (NacosPropertySource)var1.next();if (propertySource.isRefreshable()) {String dataId = propertySource.getDataId();// 为每个 dataId注册监听器this.registerNacosListener(propertySource.getGroup(), dataId);}}}}// 为每个 dataId注册监听器private void registerNacosListener(final String groupKey, final String dataKey) {String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);// 定义一个监听器Listener listener = (Listener)this.listenerMap.computeIfAbsent(key, (lst) -> {return new AbstractSharedListener() {public void innerReceive(String dataId, String group, String configInfo) {NacosContextRefresher.refreshCountIncrement();// 配置的历史记录NacosContextRefresher.this.nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);// 发布一个RefreshEvent事件,会在处理该事件的位置真正进行刷新配置项NacosContextRefresher.this.applicationContext.publishEvent(new RefreshEvent(this, (Object)null, "..."));}};});try {// 调用configService接口的addListener()添加监听器this.configService.addListener(dataKey, groupKey, listener);} catch (NacosException var6) {log.warn(String.format("register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey, groupKey), var6);}}
}



当NacosServer端某个配置文件改动后,就会回调上面监听器的innerReceive()方法,在该方法中就会发布RefreshEvent事件,处理该事件的是RefreshEventListener类中的onApplicationEvent()方法:

  • 直接调用refresh()方法
public class RefreshEventListener implements SmartApplicationListener {...public void onApplicationEvent(ApplicationEvent event) {if (event instanceof ApplicationReadyEvent) {this.handle((ApplicationReadyEvent)event);} else if (event instanceof RefreshEvent) {// 处理RefreshEvent事件,调用handler()方法this.handle((RefreshEvent)event);}}public void handle(ApplicationReadyEvent event) {this.ready.compareAndSet(false, true);}public void handle(RefreshEvent event) {if (this.ready.get()) {// 这里就会调用refresh()方法进行刷新Set<String> keys = this.refresh.refresh();}}
}




接下来就进入到了ContextRefresher类的refresh()方法:

  • 刷新环境变量
  • 销毁@RefreshScope注解修改的bean实例
public synchronized Set<String> refresh() {// 刷新环境变量Set<String> keys = this.refreshEnvironment();// 销毁@RefreshScope注解修改的bean实例this.scope.refreshAll();return keys;
}



NacosServer源码分析

配置dump

服务端启动时就会依赖 DumpService 的 init 方法,从数据库中 load 配置存储在本地磁盘上,并将一些重要的元信息例如 MD5 值缓存在内存中。服务端会根据心跳文件中保存的最后一次心跳时间,来判断到底是从数据库 dump 全量配置数据还是部分增量配置数据(如果机器上次心跳间隔是 6h 以内的话)。

全量 dump 当然先清空磁盘缓存,然后根据主键 ID 每次捞取一千条配置刷进磁盘和内存。增量 dump 就是捞取最近六小时的新增配置(包括更新的和删除的),先按照这批数据刷新一遍内存和文件,再根据内存里所有的数据全量去比对一遍数据库,如果有改变的再同步一次,相比于全量 dump 的话会减少一定的数据库 IO 和磁盘 IO 次数。



配置发布

在这里插入图片描述


结论:

  • 更改数据库中的数据,持久化信息到mysql

  • 触发一个ConfigDataChangeEvent事件。至此请求结束。

  • 接下来就处理上面的事件:

    • 遍历Nacos集群下的所有节点,包括自己

    • 生成一个http/rpc的任务对象去执行,这里就直接看rpc任务对象的处理

    • 判断是不是当前节点,如果是就调用dump()方法去处理

      • 将更改的数据保存至本地磁盘中

      • 生成md5,并通过一个key将md5存入cache中,再发布一个LocalDataChangeEvent事件,该事件存了key

        处理上方事件的方法中会开启一个任务,在任务的run()方法中会真正调用客户端发送grpc请求,发送一个ConfigChangeNotifyRequest请求对象

    • 如果不是当前节点就发送grpc请求为其他节点同步修改配置项



NacosClient端的处理

  • 接收到ConfigChangeNotifyRequest请求对象,然后就放入了一个阻塞队列中。
  • 客户端while死循环,队列中有任务了/每隔5s 从队列中获取任务/null,去执行配置监听器方法
  • 根据CacheData对象远程获取配置内容,进行md5的比较
  • 如果有变化就通知监听器去处理,这就回到了nacosClient端获取配置中的流程了



我们接下来分析,在NacosServer端修改了配置,点击发布配置,NacosClient怎么就能接收到是哪一个dataId修改了嘞

发布配置官方接口文档

这里实际上是调用的NacosServer的/nacos/v2/cs/config接口,处理该请求的是ConfigController.publishConfig()方法

在这一次请求中其实就是做了两件事:将更新写入数据库中,然后发布一个事件,将事件添加进队列中,此时请求就结束了。

在controller方法中有两行核心的方法

// 进入service层,核心方法
// 持久化配置信息到数据库
persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);// 触发ConfigDataChangeEvent事件,这是客户端能感知配置更新的根本原因
ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));




持久化配置信息到数据库就没必须继续看下去了,我们接下来看看notifyConfigChange()方法的实现:

  • 该方法就是单纯的一层一层方法调用
public class ConfigChangePublisher {public static void notifyConfigChange(ConfigDataChangeEvent event) {if (PropertyUtil.isEmbeddedStorage() && !EnvUtil.getStandaloneMode()) {return;}// 该方法继续调用NotifyCenter.publishEvent(event);}
}public static boolean publishEvent(final Event event) {try {// 该方法继续调用return publishEvent(event.getClass(), event);} catch (Throwable ex) {LOGGER.error("There was an exception to the message publishing : ", ex);return false;}
}private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {return INSTANCE.sharePublisher.publish(event);}final String topic = ClassUtils.getCanonicalName(eventType);EventPublisher publisher = INSTANCE.publisherMap.get(topic);if (publisher != null) {// 该方法继续调用return publisher.publish(event);}return false;
}



这里就会进入到DefaultPublisher类的publish(event)方法中。该类非常重要,Nacos很多功能都用的这统一的一套事件发布与订阅。

public boolean publish(Event event) {checkIsStart();// 如果队列中写满了,那么就返回false,下面就直接处理了// 该类的run()方法中会死循环从队列中取任务执行boolean success = this.queue.offer(event);if (!success) {LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);// 处理事件receiveEvent(event);return true;}return true;
}




此时,本次http请求就已经结束了,这里将事件放入队列中后就会有其他的订阅者来异步处理事件。

这样的设计也实现了发布任务与处理任务之间的解耦

此时队列中有了任务,在NacosServer中任务订阅者此时还需要做两件事:

  • 通知集群其他Nacos节点进行更新
  • 通知NacosClient端配置发生了更改
public void notifySubscriber(final Subscriber subscriber, final Event event) {LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);// 订阅者需要去处理事件// 主要做两件事 通知集群其他Nacos节点进行更新、通知NacosClient端配置发生了更改final Runnable job = () -> subscriber.onEvent(event);final Executor executor = subscriber.executor();if (executor != null) {executor.execute(job);} else {try {job.run();} catch (Throwable e) {LOGGER.error("Event callback exception: ", e);}}
}




这里会进入到AsyncNotifyService的构造方法中:

  • 遍历集群环境下的所有节点
  • 创建任务添加进http/grpc的队列中
  • 从http/grpc的队列中取任务执行
public AsyncNotifyService(ServerMemberManager memberManager) {...// Register A Subscriber to subscribe ConfigDataChangeEvent.NotifyCenter.registerSubscriber(new Subscriber() {@Overridepublic void onEvent(Event event) {// Generate ConfigDataChangeEvent concurrentlyif (event instanceof ConfigDataChangeEvent) {ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;long dumpTs = evt.lastModifiedTs;String dataId = evt.dataId;String group = evt.group;String tenant = evt.tenant;String tag = evt.tag;// 获取nacos集群下的各个节点Collection<Member> ipList = memberManager.allMembers();// In fact, any type of queue here can beQueue<NotifySingleTask> httpQueue = new LinkedList<NotifySingleTask>();Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<NotifySingleRpcTask>();for (Member member : ipList) {// 使用http/rpc的方式通知各节点,具体的dataId被修改了// 这里先添加进队列,下面的if中处理if (!MemberUtil.isSupportedLongCon(member)) {httpQueue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),evt.isBeta));} else {rpcQueue.add(new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member));}}// 处理队列中的任务if (!httpQueue.isEmpty()) {ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue));}if (!rpcQueue.isEmpty()) {// 直接看AsyncRpcTask类中的run()方法ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));}}}......});}




我们这里是nacos2.X的版本,所以我这里就自己看AsyncRpcTask类的run()方法:

  • 调用dump()方法
  • 发送请求,同步其他节点数据变化
public void run() {while (!queue.isEmpty()) {NotifySingleRpcTask task = queue.poll();ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest();syncRequest.setDataId(task.getDataId());syncRequest.setGroup(task.getGroup());syncRequest.setBeta(task.isBeta);syncRequest.setLastModified(task.getLastModified());syncRequest.setTag(task.tag);syncRequest.setTenant(task.getTenant());Member member = task.member;// 判断member是不是当前节点if (memberManager.getSelf().equals(member)) {// 如果是当前节点就直接调用dump()方法// 这里又会经过服务注册那边当服务更改后为订阅者进行推送的流程中,这里最终是会到DumpProcessor.process()方法if (syncRequest.isBeta()) {dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),syncRequest.getLastModified(), NetUtils.localIP(), true);} else {dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP());}continue;}// 其他节点if (memberManager.hasMember(member.getAddress())) {boolean unHealthNeedDelay = memberManager.isUnHealth(member.getAddress());if (unHealthNeedDelay) {...} else {if (!MemberUtil.isSupportedLongCon(member)) {asyncTaskExecute(new NotifySingleTask(task.getDataId(), task.getGroup(), task.getTenant(), task.tag,task.getLastModified(), member.getAddress(), task.isBeta));} else {try {// 为nacos集群中的其他节点进行同步配置变化configClusterRpcClientProxy.syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task));} catch (Exception e) {MetricsMonitor.getConfigNotifyException().increment();asyncTaskExecute(task);}}}} else {//No nothig if  member has offline.}}
}



dump()这里又会经过服务注册那边当服务更改后为订阅者进行推送的流程中:先将task存入一个队列中 --> 去队列中的任务 --> 各自的任务处理类去处理。这里最终是会到DumpProcessor.process()方法:

  • 方法调用process() --> configDump() —>dump()
  • 将配置保存在磁盘文件中
  • 配置发生变化,更新md5
  • 发布LocalDataChangeEvent事件
    目的告诉NacosClient端,配置发生了改变。处理该事件的RpcConfigChangeNotifier.onEvent()
public boolean process(NacosTask task) {...// 直接看这里最后的configDump()方法return DumpConfigHandler.configDump(build.build());
}public static boolean configDump(ConfigDumpEvent event){......if (StringUtils.isBlank(event.getTag())) {......boolean result;if (!event.isRemove()) {// 核心方法,进入到这里// 写入磁盘result = ConfigCacheService.dump(dataId, group, namespaceId, content, lastModified, type, encryptedDataKey);if (result) {ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,content.length());}} else {......}return result;} else {.......}public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,String type, String encryptedDataKey) {......try {// 根据content配置内容生成一个md5。content中的内容有变化那么生成的md5也肯定是不一样的final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);// 配置的最后一次更新时间if (lastModifiedTs < ConfigCacheService.getLastModifiedTs(groupKey)) {DUMP_LOG.warn(...);return true;}if (md5.equals(ConfigCacheService.getContentMd5(groupKey)) && DiskUtil.targetFile(dataId, group, tenant).exists()) {DUMP_LOG.warn(...);} else if (!PropertyUtil.isDirectRead()) {// 将配置保存在磁盘文件中DiskUtil.saveToDisk(dataId, group, tenant, content);}// 配置发生变化,更新md5// 继续跟入该方法updateMd5(groupKey, md5, lastModifiedTs, encryptedDataKey);return true;} catch (IOException ioe) {......return false;} finally {releaseWriteLock(groupKey);}}public static void updateMd5(String groupKey, String md5, long lastModifiedTs, String encryptedDataKey) {// 根据groupKey,将md5数据保存在缓存中CacheItem cache = makeSure(groupKey, encryptedDataKey, false);if (cache.md5 == null || !cache.md5.equals(md5)) {cache.md5 = md5;cache.lastModifiedTs = lastModifiedTs;// 发布LocalDataChangeEvent事件,包含groupKey// 目的告诉NacosClient端,配置发生了改变。处理该事件的RpcConfigChangeNotifier.onEvent()NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));}}



接下来看看RpcConfigChangeNotifier.onEvent()的方法逻辑:

  • 遍历各个客户端
  • 发送grpc请求
@Override
public void onEvent(LocalDataChangeEvent event) {...configDataChanged(groupKey, dataId, group, tenant, isBeta, betaIps, tag);}public void configDataChanged(String groupKey, String dataId, String group, String tenant, boolean isBeta,List<String> betaIps, String tag) {// 其实就代表着Client集合Set<String> listeners = configChangeListenContext.getListeners(groupKey);if (CollectionUtils.isEmpty(listeners)) {return;}int notifyClientCount = 0;// 遍历各个客户端for (final String client : listeners) {Connection connection = connectionManager.getConnection(client);if (connection == null) {continue;}ConnectionMeta metaInfo = connection.getMetaInfo();//一些检查校验String clientIp = metaInfo.getClientIp();String clientTag = metaInfo.getTag();if (isBeta && betaIps != null && !betaIps.contains(clientIp)) {continue;}if (StringUtils.isNotBlank(tag) && !tag.equals(clientTag)) {continue;}// 封装一个请求对象ConfigChangeNotifyRequestConfigChangeNotifyRequest notifyRequest = ConfigChangeNotifyRequest.build(dataId, group, tenant);// 创建rpc推送任务,在RpcPushTask.run()方法中推送客户端RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequest, 50, client, clientIp, metaInfo.getAppName());push(rpcPushRetryTask);notifyClientCount++;}}public void run() {tryTimes++;if (!tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH, connectionId, clientIp)) {push(this);} else {// 这里推送客户端,客户端再进行refresh操作rpcPushService.pushWithCallback(connectionId, notifyRequest, new AbstractPushCallBack(3000L) {@Overridepublic void onSuccess() {tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH_SUCCESS, connectionId, clientIp);}@Overridepublic void onFail(Throwable e) {tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH_FAIL, connectionId, clientIp);Loggers.REMOTE_PUSH.warn("Push fail", e);push(RpcPushTask.this);}}, ConfigExecutor.getClientConfigNotifierServiceExecutor());}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/869249.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java面试八股之MySQL主从复制机制简述

MySQL主从复制机制简述 MySQL的主从复制机制是一种数据复制方案&#xff0c;用于在多个服务器之间同步数据。此机制允许从一个服务器&#xff08;主服务器&#xff09;到一个或多个其他服务器&#xff08;从服务器&#xff09;进行数据的复制&#xff0c;从而增强数据冗余、提…

Qt 音频编程实战项目

一Qt 音频基础知识 QT multimediaQMediaPlayer 类&#xff1a;媒体播放器&#xff0c;主要用于播放歌曲、网络收音 机等功能。QMediaPlaylist 类&#xff1a;专用于播放媒体内容的列表。 二 音频项目实战程序 //版本5.12.8 .proQT core gui QT multimedia greate…

@Slf4j idea标红Cannot resolve symbol ‘log‘

一、背景 时间久了没有应用idea,打开工程后项目 log 提示报红&#xff0c;未能解析&#xff0c;Cannot resolve symbol log &#xff0c;Slf4j 注解正常&#xff0c;应用的lombok插件。 检查lombok插件安装情况&#xff0c;发现未安装&#xff0c;重新安装重启idea后正常。 二…

Cesium自定义着色器构件三角面片【闪烁】问题,但是一移动视角就闪烁

问题&#xff1a;已知各个顶点的坐标信息、颜色和索引信息&#xff0c;并自定义绘制三角面片。 但是绘制的三角面片随着视角稍微改动就会出现闪烁现象&#xff01;&#xff01;&#xff01;why? Cesium数据类型的精度问题&#xff0c;例如下面为了获取能接收到高精度坐标信息…

系统架构师考点--信息系统基础知识

大家好。今天我们来总结一下信息系统基础知识的相关考点&#xff0c;每年都会考&#xff0c;一般是在上午场选择题中&#xff0c;占3分左右&#xff0c;其次下午场论文也有可能会出相关的考题。 一、信息系统概述 信息系统&#xff1a; 是由计算机硬件、网络和通信设备、计算…

selenium,在元素块下查找条件元素

def get_norms_ele_text(self):elementsself.get_norms_elements()locBy.CSS_SELECTOR,"div.sku-select-row-label"by loc[0] # 获取By类型&#xff0c;例如By.CSS_SELECTORvalue loc[1] # 获取具体的CSS选择器字符串&#xff0c;例如"div.sku-select-row-l…

Java虚拟机面试题汇总

目录 1. JVM的主要组成部分及其作用&#xff1f; 1.1 运行时数据区划分&#xff1f; 1.2 哪些区域可能会发生OOM&#xff1f; 1.3 堆和栈的区别&#xff1f; 1.4 内存模型中的happen-before是什么&#xff1f; 2. HotSpot虚拟机对象创建流程&#xff1f; 2.1 类加载过程…

数据库之SQL(二)

目录 一、简述SQL中如何将“行”转换为“列” 二、简述SQL注入 三、如何将一张表的部分数据更新到另一张表 四、WHERE和HAVING的区别 一、简述SQL中如何将“行”转换为“列” 我们以MySQL数据库为例&#xff0c;来说明行转列的实现方式。 首先&#xff0c;假设我们有一张分…

Echarts桑基图

关于Echarts的使用方法参考&#xff1a;vue2中echarts的使用_vue2中使用echarts-CSDN博客 实现效果&#xff1a; 代码&#xff1a; var sysT {"用采": #2D9BFF,"营销系统": #39BFFF,"ERP": #76C2FF,"财务管控": #5F57FC,"PMS&…

Java面试八股之描述一下MySQL使用索引查询数据的过程

描述一下MySQL使用索引查询数据的过程 1.解析查询语句与查询优化 用户提交一个 SQL 查询语句&#xff0c;MySQL 的查询解析器对其进行词法分析和语法分析&#xff0c;生成解析树。 查询优化器根据解析树、表结构信息、统计信息以及索引信息&#xff0c;决定是否使用 B树索引…

昇思MindSpore学习总结十二 —— ShuffleNet图像分类

当前案例不支持在GPU设备上静态图模式运行&#xff0c;其他模式运行皆支持。 1、ShuffleNet网络介绍 ShuffleNetV1是旷视科技提出的一种计算高效的CNN模型&#xff0c;和MobileNet, SqueezeNet等一样主要应用在移动端&#xff0c;所以模型的设计目标就是利用有限的计算资源来达…

mybatis-plus参数绑定异常

前言 最近要搞个发票保存的需求&#xff0c;当发票数据有id时说明是发票已经保存只需更新发票数据即可&#xff0c;没有id时说明没有发票数据需要新增发票&#xff1b;于是将原有的发票提交接口改造了下&#xff0c;将调用mybatis-plus的save方法改为saveOrUpdate方法&#xff…

opencv读取视频文件夹内视频的名字_时长_帧率_分辨率写入excel-cnblog

看视频的时候有的视频文件名贼长。想要翻看&#xff0c;在文件夹里根本显示不出来&#xff0c;缩短又会丢失一些信息&#xff0c;所以我写了一份Python代码&#xff0c;直接获取视频的名字&#xff0c;时长&#xff0c;帧率&#xff0c;还有分辨率写到excel里。 实际效果如下图…

imx6ull/linux应用编程学习(15) 移植MQTT客户端库

1. 准备开发环境 确保你的Ubuntu系统已经安装了必要的工具和依赖项。打开终端并运行以下命令&#xff1a; sudo apt update sudo apt install build-essential cmake git2. 获取MQTT库 git clone https://github.com/eclipse/paho.mqtt.c.git cd paho.mqtt.c3. 编译MQTT库 mk…

教程系列1 | 趋动云『社区项目』极速部署 SD WebUI

在上周&#xff0c;趋动云新推出的『社区项目』功能&#xff0c;以“一键克隆”的极致便捷与“省时省力”的高效体验&#xff0c;赢得了广大用户的关注。 随后&#xff0c;启动趋动云『社区项目』教程系列&#xff0c;旨在从零开始&#xff0c;全方位、手把手地引领您深入探索…

实现双向循环链表的 创建、判空、尾插、遍历、尾删、销毁

#include "link.h"//create DoubleLink head node DoubleLink_p DoubleLink_create() {DoubleLink_p H (DoubleLink_p)malloc(sizeof(DoubleLink));if(NULL H){printf("失败");return NULL;}H -> len 0;H -> next H;H -> prior H;printf(&qu…

044基于SSM+Jsp的个性化影片推荐系统

开发语言&#xff1a;Java框架&#xff1a;ssm技术&#xff1a;JSPJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包…

安装nodejs | npm报错

nodejs安装步骤: 官网&#xff1a;https://nodejs.org/en/ 在官网下载nodejs: 双击下载下来的msi安装包&#xff0c;一直点next&#xff0c;我选的安装目录是默认的: 测试是否安装成功&#xff1a; 输入cmd打开命令提示符&#xff0c;输入node -v可以看到版本&#xff0c;说…

JVM原理(二四):JVM虚拟机锁优化

高效并发是从JDK 5升级到JDK 6后一项重要的改进项&#xff0c;HotSpot虛 拟机开发团队在这个版本上花费了大量的资源去实现各种锁优化技术&#xff0c;如适应性自旋( Adaptive Spinning)、锁消除( Lock Elimination)、锁膨胀(Lock Coarsening)、轻量级锁(Lightweight Locking)、…

代码随想录打卡第十八天

代码随想录–二叉树部分 day 17 休息日 day 18 二叉树第五天 文章目录 代码随想录--二叉树部分一、力扣654--最大二叉树二、力扣617--合并二叉树三、力扣700--二乘树中的搜素四、力扣98--验证二叉搜索树 一、力扣654–最大二叉树 代码随想录题目链接&#xff1a;代码随想录 给…