上面章节我们讲了服务启动的时候从远程 Nacos 服务端拉到配置,以及服务启动后对需要支持热更新的配置都注册了一个监听器,这个章节我们来说下配置变动后具体是怎么处理的。
回到前面文章说过的 NacosPropertySourceLocator 的 locate()方法看看,该方法首先会通过NacosConfigManager获取一个 ConfigService,我们看下ConfigService是如何创建的:
public ConfigService getConfigService() {if (Objects.isNull(service)) {createConfigService(this.nacosConfigProperties);}return service;
}static ConfigService createConfigService(NacosConfigProperties nacosConfigProperties) {if (Objects.isNull(service)) {synchronized (NacosConfigManager.class) {try {if (Objects.isNull(service)) {service = NacosFactory.createConfigService(nacosConfigProperties.assembleConfigServiceProperties());}}catch (NacosException e) {log.error(e.getMessage());throw new NacosConnectionFailureException(nacosConfigProperties.getServerAddr(), e.getMessage(), e);}}}return service;
}// com.alibaba.nacos.api.config.ConfigFactory#createConfigService(java.util.Properties)
public static ConfigService createConfigService(Properties properties) throws NacosException {try {Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");Constructor constructor = driverImplClass.getConstructor(Properties.class);ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);return vendorImpl;} catch (Throwable e) {throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);}
}
从源码可以看到,NacosConfigManager 中会进行一个 ConfigService 单例对象的创建,创建流程最终会委托给 ConfigFactory,使用反射方式创建一个 NacosConfigService 的实例对象,NacosConfigService 是一个很核心的类,配置的获取,监听器的注册都需要经此。
我们看下 NacosConfigService 的构造函数,会去创建一个 ClientWorker 类的对象,这个类是实现配置热更新的核心类。
/*** 长轮训*/
private final ClientWorker worker;public NacosConfigService(Properties properties) throws NacosException {final NacosClientProperties clientProperties = NacosClientProperties.PROTOTYPE.derive(properties);ValidatorUtils.checkInitParam(clientProperties);// 初始化NamespaceinitNamespace(clientProperties);// 创建了一个配置过滤器链,可以采用SPI扩展机制加载对应的过滤器实现类this.configFilterChainManager = new ConfigFilterChainManager(clientProperties.asProperties());// 创建了一个服务管理器,内含一个定时轮询线程池,每隔30s拉取一次服务ServerListManager serverListManager = new ServerListManager(clientProperties);serverListManager.start();// 创建了一个客户端工作者,包含了一个代理对象this.worker = new ClientWorker(this.configFilterChainManager, serverListManager, clientProperties);// will be deleted in 2.0 later versionsagent = new ServerHttpAgent(serverListManager);}
再来看下ClientWorker的构造方法:
public ClientWorker(final ConfigFilterChainManager configFilterChainManager, ServerListManager serverListManager,final NacosClientProperties properties) throws NacosException {this.configFilterChainManager = configFilterChainManager;// 初始化timeout、taskPenaltyTime、enableRemoteSyncConfig属性init(properties);// 创建一个用于配置服务端的Rpc通信客户端agent = new ConfigRpcTransportClient(properties, serverListManager);// 计算合适的核心线程数int count = ThreadUtils.getSuitableThreadCount(THREAD_MULTIPLE);// 初始化一个线程池ScheduledExecutorService executorService = Executors.newScheduledThreadPool(Math.max(count, MIN_THREAD_NUM),r -> {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.Worker");t.setDaemon(true);return t;});// 配置了一个异步处理线程池agent.setExecutor(executorService);// 调用start方法agent.start();}
首先创建一个用于配置服务端的Rpc通信客户端,然后初始化了一个线程池,配置到rpc客户端中,调用ConfigRpcTransportClient#start()启动:
public void start() throws NacosException {// 认证服务,主要是通过Secret Key和Access Key做认证用securityProxy.login(this.properties);// 每隔5s执行一次认证this.executor.scheduleWithFixedDelay(() -> securityProxy.login(properties), 0,this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);// 内部启动startInternal();
}
然后启动worker:
/*** 阻塞队列*/
private final BlockingQueue<Object> listenExecutebell = new ArrayBlockingQueue<>(1);public void startInternal() {// 线程池在阻塞等到信号的到来executor.schedule(() -> {while (!executor.isShutdown() && !executor.isTerminated()) {try {// 获取到listenExecutebell.offer(bellItem)的信号// 如果没有监听器的变动,则等待5s处理一次listenExecutebell.poll(5L, TimeUnit.SECONDS);if (executor.isShutdown() || executor.isTerminated()) {continue;}// 执行配置监听executeConfigListen();} catch (Throwable e) {LOGGER.error("[rpc listen execute] [rpc listen] exception", e);try {Thread.sleep(50L);} catch (InterruptedException interruptedException) {//ignore}notifyListenConfig();}}}, 0L, TimeUnit.MILLISECONDS);}
核心代码就是executeConfigListen()执行配置监听:
public void executeConfigListen() {// 存放含有listen的cacheDataMap<String, List<CacheData>> listenCachesMap = new HashMap<>(16);// 存放不含有listen的cacheDataMap<String, List<CacheData>> removeListenCachesMap = new HashMap<>(16);long now = System.currentTimeMillis();// 当前时间 减去 上一次全量同步的时间,如果大于3分钟,表示到了全量同步的时间了boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL;for (CacheData cache : cacheMap.get().values()) {synchronized (cache) {// 是否和服务端一致if (cache.isConsistentWithServer()) {// 一致则检查md5值,若md5值和上一个不一样,则说明变动了,需要通知监听器cache.checkListenerMd5();// 是否到全量同步时间了,未到则直接跳过if (!needAllSync) {continue;}}if (!cache.isDiscard()) {// 非丢弃型,即新增,放入listenCachesMap//get listen configif (!cache.isUseLocalConfigInfo()) {List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId()));if (cacheDatas == null) {cacheDatas = new LinkedList<>();listenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);}cacheDatas.add(cache);}} else if (cache.isDiscard() && CollectionUtils.isEmpty(cache.getListeners())) {// 丢弃型,即删除, 放入removeListenCachesMapif (!cache.isUseLocalConfigInfo()) {List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId()));if (cacheDatas == null) {cacheDatas = new LinkedList<>();removeListenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);}cacheDatas.add(cache);}}}}// 如果需要和服务端数据同步,则listenCachesMap和removeListenCachesMap存放了本地数据,需要和服务端对比boolean hasChangedKeys = checkListenCache(listenCachesMap);//execute check remove listen.checkRemoveListenCache(removeListenCachesMap);if (needAllSync) {// 更新同步时间lastAllSyncTime = now;}//If has changed keys,notify re sync md5.if (hasChangedKeys) {// 服务端告知了有数据变动,则需要再同步一次notifyListenConfig();}}
主要做了两件事情:
- 1)、首先遍历之前缓存到cacheMap的所有CacheData,判断是否和服务端保持一致,如果一致,则检查配置的md5值,如果md5值和上一个不一样,则说明变动了,需要通知监听器。
// cacheData的校验方法
void checkListenerMd5() {// 遍历这个配置所有的监听者for (ManagerListenerWrap wrap : listeners) {if (!md5.equals(wrap.lastCallMd5)) {// 如果内容变动了,直接通知监听器处理,并且更新 listenerWrap 中的 content、Md5safeNotifyListener(dataId, group, content, type, md5, encryptedDataKey, wrap);}}
}
遍历这个配置对应的所有的监听者,判断内容是否变动,如果变动了,调用safeNotifyListener()直接通知监听器处理。
private void safeNotifyListener(final String dataId, final String group, final String content, final String type,final String md5, final String encryptedDataKey, final ManagerListenerWrap listenerWrap) {// 获取到监听器final Listener listener = listenerWrap.listener;if (listenerWrap.inNotifying) {LOGGER.warn("[{}] [notify-currentSkip] dataId={}, group={},tenant={}, md5={}, listener={}, listener is not finish yet,will try next time.",envName, dataId, group, tenant, md5, listener);return;}// 定义一个通知任务NotifyTask job = new NotifyTask() {@Overridepublic void run() {long start = System.currentTimeMillis();ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();ClassLoader appClassLoader = listener.getClass().getClassLoader();ScheduledFuture<?> timeSchedule = null;try {if (listener instanceof AbstractSharedListener) {AbstractSharedListener adapter = (AbstractSharedListener) listener;adapter.fillContext(dataId, group);LOGGER.info("[{}] [notify-context] dataId={}, group={},tenant={}, md5={}", envName, dataId,group, tenant, md5);}// Before executing the callback, set the thread classloader to the classloader of// the specific webapp to avoid exceptions or misuses when calling the spi interface in// the callback method (this problem occurs only in multi-application deployment).Thread.currentThread().setContextClassLoader(appClassLoader);ConfigResponse cr = new ConfigResponse();cr.setDataId(dataId);cr.setGroup(group);cr.setContent(content);cr.setEncryptedDataKey(encryptedDataKey);configFilterChainManager.doFilter(null, cr);String contentTmp = cr.getContent();timeSchedule = getNotifyBlockMonitor().schedule(new LongNotifyHandler(listener.getClass().getSimpleName(), dataId, group, tenant, md5,notifyWarnTimeout, Thread.currentThread()), notifyWarnTimeout,TimeUnit.MILLISECONDS);listenerWrap.inNotifying = true;// 回调通知,也就是通知变动的内容// 这里就是执行前面说到的注册监听时的一个回调函数,里面其实最主要的就是发布了一个RefreshEvent事件,springcloud会处理这个事件listener.receiveConfigInfo(contentTmp);// compare lastContent and contentif (listener instanceof AbstractConfigChangeListener) {// 扩展点,告知配置内容的变动Map<String, ConfigChangeItem> data = ConfigChangeHandler.getInstance().parseChangeData(listenerWrap.lastContent, contentTmp, type);ConfigChangeEvent event = new ConfigChangeEvent(data);((AbstractConfigChangeListener) listener).receiveConfigChange(event);listenerWrap.lastContent = contentTmp;}// 赋值最新的md5listenerWrap.lastCallMd5 = md5;LOGGER.info("[{}] [notify-ok] dataId={}, group={},tenant={}, md5={}, listener={} ,job run cost={} millis.",envName, dataId, group, tenant, md5, listener, (System.currentTimeMillis() - start));} catch (NacosException ex) {LOGGER.error("[{}] [notify-error] dataId={}, group={},tenant={},md5={}, listener={} errCode={} errMsg={},stackTrace :{}",envName, dataId, group, tenant, md5, listener, ex.getErrCode(), ex.getErrMsg(),getTrace(ex.getStackTrace(), 3));} catch (Throwable t) {LOGGER.error("[{}] [notify-error] dataId={}, group={},tenant={}, md5={}, listener={} tx={}",envName, dataId, group, tenant, md5, listener, getTrace(t.getStackTrace(), 3));} finally {listenerWrap.inNotifying = false;Thread.currentThread().setContextClassLoader(myClassLoader);if (timeSchedule != null) {timeSchedule.cancel(true);}}}};try {// 监听器配置了异步执行器,就异步执行if (null != listener.getExecutor()) {LOGGER.info("[{}] [notify-listener] task submitted to user executor, dataId={}, group={},tenant={}, md5={}, listener={} ",envName, dataId, group, tenant, md5, listener);job.async = true;listener.getExecutor().execute(job);} else {// 同步执行LOGGER.info("[{}] [notify-listener] task execute in nacos thread, dataId={}, group={},tenant={}, md5={}, listener={} ",envName, dataId, group, tenant, md5, listener);job.run();}} catch (Throwable t) {LOGGER.error("[{}] [notify-listener-error] dataId={}, group={},tenant={}, md5={}, listener={} throwable={}",envName, dataId, group, tenant, md5, listener, t.getCause());}
}
在safeNotifyListener方法中,创建了一个通知任务NotifyTask,NotifyTask实现了runnable接口,需要关注其run方法的实现。然后看监听器是否配置了异步执行线程池,如果配置了,就异步执行;否则就是同步执行。
我们来看看NotifyTask的run方法做了哪些事情。
先是创建了一个ConfigResponse对象,封装了namespaceId、groupId、dataId。接着,也是最重要的,就是回调监听器的receiveConfigInfo()方法,然后更新监听器包装器中的 lastContent、lastCallMd5字段。
那么receiveConfigInfo()方法到底做了什么事情呢?我们不妨来看看之前注册监听器的代码:
// com.alibaba.cloud.nacos.refresh.NacosContextRefresher#registerNacosListener
private void registerNacosListener(final String groupKey, final String dataKey) {// key = {dataId,group}String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);Listener listener = listenerMap.computeIfAbsent(key,lst -> new AbstractSharedListener() {@Overridepublic void innerReceive(String dataId, String group,String configInfo) {// 累加配置刷新次数refreshCountIncrement();// 添加一条刷新记录nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);// 通过Spring上下文发布一个RefreshEvent刷新事件applicationContext.publishEvent(new RefreshEvent(this, null, "Refresh Nacos config"));if (log.isDebugEnabled()) {log.debug(String.format("Refresh Nacos config group=%s,dataId=%s,configInfo=%s",group, dataId, configInfo));}}});
try {// 注册配置监听器,以 dataId + groupId + namespace 为维度进行注册的configService.addListener(dataKey, groupKey, listener);
}
catch (NacosException e) {log.warn(String.format("register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey,groupKey), e);
}
}
所以,receiveConfigInfo()方法最终执行的就是AbstractSharedListener#innerReceive()方法,主要是发布了一个RefreshEvent事件,RefreshEvent 事件主要由 SpringCloud 相关类来处理。
- 2)、 服务端告知了有数据变动,则需要再同步一次
其实就是往阻塞队列中存放内容,再来一遍前面的同步流程。
public void notifyListenConfig() {// listenExecutebell是一个阻塞队列,放入bellItem,即一个触发条件,相当于生产者listenExecutebell.offer(bellItem);
}