自动装配
SpringBoot 自动装配机制 加载 WEB/INF spring.factories
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
com.alibaba.cloud.nacos.NacosConfigBootstrapConfiguration```java
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true)
public class NacosConfigBootstrapConfiguration {......@Bean@ConditionalOnMissingBeanpublic NacosConfigManager nacosConfigManager(NacosConfigProperties nacosConfigProperties) {return new NacosConfigManager(nacosConfigProperties);}......}
创建 ConfigService
构建NacosConfigManagerBean的时候会在实例化的时候调用构造方法 他的构造方法中会创建ConfigService
public NacosConfigManager(NacosConfigProperties nacosConfigProperties) {this.nacosConfigProperties = nacosConfigProperties;// Compatible with older code in NacosConfigProperties,It will be deleted in the// future.createConfigService(nacosConfigProperties);}
static ConfigService createConfigService(NacosConfigProperties nacosConfigProperties) {if (Objects.isNull(service)) {synchronized (NacosConfigManager.class) {try {if (Objects.isNull(service)) {service = NacosFactory.createConfigService(nacosConfigProperties.assembleConfigServiceProperties());}}catch (NacosException e) {log.error(e.getMessage());throw new NacosConnectionFailureException(nacosConfigProperties.getServerAddr(), e.getMessage(), e);}}}return service;}
public static ConfigService createConfigService(Properties properties) throws NacosException {return ConfigFactory.createConfigService(properties);}
public static ConfigService createConfigService(Properties properties) throws NacosException {try {//反射拿到classClass<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");// 获取带Properties参数的构造函数Constructor constructor = driverImplClass.getConstructor(Properties.class);//通过反射构建实例ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);return vendorImpl;} catch (Throwable e) {throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);}}
NacosConfigService会在构造方法中 注入Listener接受server配置变更通知。
public NacosConfigService(Properties properties) throws NacosException {ValidatorUtils.checkInitParam(properties);// 设置namespace可以通过properties.setProperty(PropertyKeyConst.NAMESPACE)initNamespace(properties);// 初始化namespace、server地址等信息ServerListManager serverListManager = new ServerListManager(properties);// 启动主要用于endpoint方式定时获取server地址,当本地传入isFixed=trueserverListManager.start();// clientWorker初始化this.worker = new ClientWorker(this.configFilterChainManager, serverListManager, properties);// 将被废弃HttpAgent,先忽略// will be deleted in 2.0 later versionsagent = new ServerHttpAgent(serverListManager);
}
ClientWorker初始化
public ClientWorker(final ConfigFilterChainManager configFilterChainManager, ServerListManager serverListManager,final Properties properties) throws NacosException {this.configFilterChainManager = configFilterChainManager;// 初始化超时时间、重试时间等init(properties);// gRPC config agent初始化agent = new ConfigRpcTransportClient(properties, serverListManager);// 调度线程池,「处理器核数」ScheduledExecutorService executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.Worker");t.setDaemon(true);return t;}});agent.setExecutor(executorService);// 启动grpc agentagent.start();}
初始化超时时间等
private void init(Properties properties) {// 超时时间,默认30秒timeout = Math.max(ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT),Constants.CONFIG_LONG_POLL_TIMEOUT), Constants.MIN_CONFIG_LONG_POLL_TIMEOUT);// 重试时间,默认2秒taskPenaltyTime = ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.CONFIG_RETRY_TIME), Constants.CONFIG_RETRY_TIME);// 开启配置删除同步,默认falsethis.enableRemoteSyncConfig = Boolean.parseBoolean(properties.getProperty(PropertyKeyConst.ENABLE_REMOTE_SYNC_CONFIG));
}
GRPCConfigAgent初始化
public ConfigTransportClient(Properties properties, ServerListManager serverListManager) {// 默认编码UTF-8String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);if (StringUtils.isBlank(encodeTmp)) {this.encode = Constants.ENCODE;} else {this.encode = encodeTmp.trim();}// namespace租户,默认空this.tenant = properties.getProperty(PropertyKeyConst.NAMESPACE);this.serverListManager = serverListManager;// 用户名和密码验证this.securityProxy = new SecurityProxy(properties,ConfigHttpClientManager.getInstance().getNacosRestTemplate());}
启动GRPC Config Agent
public void start() throws NacosException {// 简单用户名和密码验证if (securityProxy.isEnabled()) {securityProxy.login(serverListManager.getServerUrls());this.executor.scheduleWithFixedDelay(new Runnable() {@Overridepublic void run() {securityProxy.login(serverListManager.getServerUrls());}}, 0, this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);}startInternal();
}
这里线程会一直运行从listenExecutebell这个阻塞队列中获取元素
listenExecutebell这里阻塞队列会在服务变更之后发布变更事件最后会往这个阻塞队列中塞元素 如果队列为空等待5秒后执行,如果队列不为空立即执行
@Overridepublic void startInternal() {executor.schedule(() -> {//线程池没有管理并且所有线程没有运行完while (!executor.isShutdown() && !executor.isTerminated()) {try {// 最长等待5秒listenExecutebell.poll(5L, TimeUnit.SECONDS);//如果线程池已经关闭 或者所有线程运行完直接if (executor.isShutdown() || executor.isTerminated()) {continue;}executeConfigListen();} catch (Exception e) {LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e);}}}, 0L, TimeUnit.MILLISECONDS);}
注册Listener
在Spring启动的时候会在run方法中执行
SpringApplicationRunListeners的running(context)这里面会发送一个ApplicationReadyEvent事件
NacosContextRefresher会监听到ApplicationReadyEvent事件进行nacos监听器的注册
@Overridepublic void onApplicationEvent(ApplicationReadyEvent event) {// many Spring contextif (this.ready.compareAndSet(false, true)) {this.registerNacosListenersForApplications();}}
private void registerNacosListenersForApplications() {......registerNacosListener(propertySource.getGroup(), dataId);......}
private void registerNacosListener(final String groupKey, final String dataKey) {.....//添加监听器configService.addListener(dataKey, groupKey, listener);......
}
添加监听器
构建CacheData,并缓存在cacheMap中,key是由「dataId+group+tenant」组成;每个CacheData会绑定了Listener列表,也绑定了taskId,3000个不同的CacheData对应一个taskId,对应一个gRPC通道实例
@Overridepublic void addListener(String dataId, String group, Listener listener) throws NacosException {worker.addTenantListeners(dataId, group, Arrays.asList(listener));}
public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)throws NacosException {// 默认DEFAULT_GROUPgroup = blank2defaultGroup(group);//获取租户默认是空String tenant = agent.getTenant();//构建缓存数据CacheData并放入cacheMap中CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);synchronized (cache) {for (Listener listener : listeners) {cache.addListener(listener);}// cache md5 data是否来自server同步cache.setSyncWithServer(false);//往阻队列中添加数据 listenExecutebell.offer(bellItem);agent.notifyListenConfig();}}
往缓存中添加内容
构建缓存数据CacheData并放入cacheMap中,缓存的key为 「dataId+group+tenant」例如:test+DEFAULT_GROUP。每个CacheData会绑定对应的taskId,每3000个CacheData对应一个taskId。其实从后面的代码中可以看出,每个taskId会对应一个gRPC
public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {// 从缓存中获取 如果不是空的直接返回CacheData cache = getCache(dataId, group, tenant);if (null != cache) {return cache;}// 构造缓存key以+连接,test+DEFAULT_GROUPString key = GroupKey.getKeyTenant(dataId, group, tenant);synchronized (cacheMap) {CacheData cacheFromMap = getCache(dataId, group, tenant);// multiple listeners on the same dataid+group and race condition,so// double check again// other listener thread beat me to set to cacheMapif (null != cacheFromMap) { // 再检查一遍cache = cacheFromMap;// reset so that server not hang this checkcache.setInitializing(true); // 缓存正在初始化} else {// 构造缓存数据对象cache = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);// 初始值taskId=0,注意此处每3000个CacheData共用一个taskIdint taskId = cacheMap.get().size() / (int) ParamUtil.getPerTaskConfigSize();cache.setTaskId(taskId);// fix issue # 1317 // 默认falseif (enableRemoteSyncConfig) {ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L, false);cache.setContent(response.getContent());}}Map<String, CacheData> copy = new HashMap<>(this.cacheMap.get());// key = test+DEFAULT_GROUPcopy.put(key, cache);// cacheMap = {test+DEFAULT_GROUP=CacheData [test, DEFAULT_GROUP]}cacheMap.set(copy);}LOGGER.info("[{}] [subscribe] {}", agent.getName(), key);MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size());return cache;}
缓存的内容
public class CacheData {//ConfigTransportClient名称,config_rpc_clientprivate final String name;//filter拦截链条,可以执行一些列拦截器private final ConfigFilterChainManager configFilterChainManager;//dataIdpublic final String dataId;//group名称,默认为DEFAULT_GROUPpublic final String group;//租户名称public final String tenant;//添加的Listener列表,线程安全CopyOnWriteArrayListprivate final CopyOnWriteArrayList<ManagerListenerWrap> listeners;//MD5private volatile String md5;//配置内容private volatile String content; }
配置变更
public void executeConfigListen() {Map<String/*taskId*/, List<CacheData>> listenCachesMap = new HashMap<String, List<CacheData>>(16);Map<String, List<CacheData>> removeListenCachesMap = new HashMap<String, List<CacheData>>(16);long now = System.currentTimeMillis();// 超过5分钟boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL;for (CacheData cache : cacheMap.get().values()) {synchronized (cache) {// isSyncWithServer初始为false,在下文代码中校验结束后会设置为true,表示md5 cache data同步来自server。如果为true会校验Md5.if (cache.isSyncWithServer()) { cache.checkListenerMd5(); // 内容有变更通知Listener执行if (!needAllSync) { // 不超过5分钟则不再全局校验continue;}}if (!CollectionUtils.isEmpty(cache.getListeners())) { // 有添加Listeners// get listen config 默认 falseif (!cache.isUseLocalConfigInfo()) {List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId()));if (cacheDatas == null) {cacheDatas = new LinkedList<CacheData>();listenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);}// CacheData [test, DEFAULT_GROUP]cacheDatas.add(cache);}} else if (CollectionUtils.isEmpty(cache.getListeners())) { // 没有添加Listenersif (!cache.isUseLocalConfigInfo()) {List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId()));if (cacheDatas == null) {cacheDatas = new LinkedList<CacheData>();removeListenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);}cacheDatas.add(cache);}}}}boolean hasChangedKeys = false;if (!listenCachesMap.isEmpty()) { // 有Listenersfor (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {String taskId = entry.getKey();List<CacheData> listenCaches = entry.getValue();ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);configChangeListenRequest.setListen(true);try {// 每个taskId构建rpcClient,例如:taskId= config-0-c70e0314-4770-43f5-add4-f258a4083fd7;结合上下文每3000个CacheData对应一个rpcClientRpcClient rpcClient = ensureRpcClient(taskId);// 向server发起configChangeListenRequest,server端由ConfigChangeBatchListenRequestHandler处理,还是比较md5是否变更了,变更后server端返回变更的key列表。ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy(rpcClient, configChangeListenRequest);if (configChangeBatchListenResponse != null && configChangeBatchListenResponse.isSuccess()) {Set<String> changeKeys = new HashSet<String>();// handle changed keys,notify listener// 有变化的configContextif (!CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedConfigs())) {hasChangedKeys = true;for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : configChangeBatchListenResponse.getChangedConfigs()) {String changeKey = GroupKey.getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(),changeConfig.getTenant());changeKeys.add(changeKey);boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();//当server返回变更key列表时执行refreshContentAndCheck方法。然后回调ListenerrefreshContentAndCheck(changeKey, !isInitializing);}}//handler content configsfor (CacheData cacheData : listenCaches) {String groupKey = GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.getTenant());if (!changeKeys.contains(groupKey)) {// key没有变化的,内容由server同步,设置SyncWithServer=truesynchronized (cacheData) {if (!cacheData.getListeners().isEmpty()) {cacheData.setSyncWithServer(true);continue;}}}cacheData.setInitializing(false);}}} catch (Exception e) {LOGGER.error("Async listen config change error ", e);try {Thread.sleep(50L);} catch (InterruptedException interruptedException) {//ignore}}}}if (!removeListenCachesMap.isEmpty()) {for (Map.Entry<String, List<CacheData>> entry : removeListenCachesMap.entrySet()) {String taskId = entry.getKey();List<CacheData> removeListenCaches = entry.getValue();ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(removeListenCaches);configChangeListenRequest.setListen(false);try {// 向server发送Listener取消订阅请求ConfigBatchListenRequest#listen为falseRpcClient rpcClient = ensureRpcClient(taskId);boolean removeSuccess = unListenConfigChange(rpcClient, configChangeListenRequest);if (removeSuccess) {for (CacheData cacheData : removeListenCaches) {synchronized (cacheData) {if (cacheData.getListeners().isEmpty()) {// 移除本地缓存ClientWorker.this.removeCache(cacheData.dataId, cacheData.group, cacheData.tenant);}}}}} catch (Exception e) {LOGGER.error("async remove listen config change error ", e);}try {Thread.sleep(50L);} catch (InterruptedException interruptedException) {//ignore}}}if (needAllSync) {lastAllSyncTime = now;}//If has changed keys,notify re sync md5.if (hasChangedKeys) { // key有变化触发下一轮notifyListenConfig();}
}
校验MD5
当CacheData从server同步后,会校验md5是否变更了,当变更时会回调到我们注册的Listener完成通知。通知任务被封装成Runnable任务,执行线程池可以自定义,默认为5个线程。
void checkListenerMd5() {for (ManagerListenerWrap wrap : listeners) {if (!md5.equals(wrap.lastCallMd5)) { // 配置内容有变更时,回调到]Listener中。safeNotifyListener(dataId, group, content, type, md5, wrap);}}
}
private void safeNotifyListener(final String dataId, final String group, final String content, final String type,final String md5, final ManagerListenerWrap listenerWrap) {final Listener listener = listenerWrap.listener;if (listenerWrap.inNotifying) {// ...return;}Runnable job = new Runnable() {@Overridepublic void run() {long start = System.currentTimeMillis();ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();ClassLoader appClassLoader = listener.getClass().getClassLoader();try {if (listener instanceof AbstractSharedListener) {AbstractSharedListener adapter = (AbstractSharedListener) listener;adapter.fillContext(dataId, group);// ...}Thread.currentThread().setContextClassLoader(appClassLoader);ConfigResponse cr = new ConfigResponse();cr.setDataId(dataId);cr.setGroup(group);cr.setContent(content);// filter拦截继续过滤configFilterChainManager.doFilter(null, cr);String contentTmp = cr.getContent();listenerWrap.inNotifying = true;// 回调注册Listener的receiveConfigInfo方法或者receiveConfigChange逻辑listener.receiveConfigInfo(contentTmp);// compare lastContent and contentif (listener instanceof AbstractConfigChangeListener) {Map data = ConfigChangeHandler.getInstance().parseChangeData(listenerWrap.lastContent, content, type);ConfigChangeEvent event = new ConfigChangeEvent(data);// 回调变更事件方法((AbstractConfigChangeListener) listener).receiveConfigChange(event);listenerWrap.lastContent = content;}listenerWrap.lastCallMd5 = md5;// ..} catch (NacosException ex) {// ...} catch (Throwable t) {// ...} finally {listenerWrap.inNotifying = false;Thread.currentThread().setContextClassLoader(myClassLoader);}}};final long startNotify = System.currentTimeMillis();try {// 优先使用我们示例中注册提供的线程池执行job,如果没有设置使用默认线程池「INTERNAL_NOTIFIER」,默认5个线程if (null != listener.getExecutor()) {listener.getExecutor().execute(job);} else {try {INTERNAL_NOTIFIER.submit(job); // 默认线程池执行,为5个线程} catch (RejectedExecutionException rejectedExecutionException) {// ...job.run();} catch (Throwable throwable) {// ...job.run();}}} catch (Throwable t) {// ...}final long finishNotify = System.currentTimeMillis();// ...
}
key有变更
注册Listener后,会构建与server的RPC通道rpcClient;向server发起变更查询请求configChangeListenRequest,server端通过比较缓存的md5值,返回client变更的key列表;client通过变更的key列表向server发起配置查询请求ConfigQueryRequest,获取变更内容,并回调我们注册的Listener。
private void refreshContentAndCheck(CacheData cacheData, boolean notify) {try {// 向server发起ConfigQueryRequest,查询配置内容String[] ct = getServerConfig(cacheData.dataId, cacheData.group, cacheData.tenant, 3000L, notify);//设置最新的内容信息cacheData.setContent(ct[0]);if (null != ct[1]) {cacheData.setType(ct[1]);}if (notify) { // 记录日志// ...}// 回调注册的Listener逻辑cacheData.checkListenerMd5();} catch (Exception e) {//...}
}
总结
客户端在启动的时候会构建一个ConfigService的处理类,然后再ConfigService的构造,方法中会创建一个ClientWorker 用来处理对服务端的网络通信及后续变更处理,
当服务端有配置变更的时候会发送配置变更事件最终会往一个阻塞队列中取offer数据,然后ClientWorker启动的时候会构建一个定时线程去从这个阻塞队列中阻塞拿数据 如果队列为空等待5秒后执行,如果队列不为空立即执行 然后会将3000个CacheDate其实就是配置数据组成一个taskId 然后往服务端发送grpc请求,服务端会检查 md5比较哪些配置发生了变更 然后会返回发生变更的key列表,然后客户端根据服务端返回的key列表 去服务端拉取最新的配置信息 然后缓存到本地