存储配置
从整体上Nacos服务端的配置存储分为三层:
内存:Nacos每个节点都在内存里缓存了配置,但是只包含配置的md5(缓存配置文件太多了),所以内存级别的配置只能用于比较配置是否发生了变更,只用于客户端长轮询配置等场景。
文件系统:文件系统配置来源于数据库写入的配置。如果是集群启动或mysql单机启动,服务端会以本地文件系统的配置响应客户端查询。
数据库:所有写数据都会先写入数据库。只有当以derby数据源(-DembeddedStorage=true)单机(-Dnacos.standalone=true)启动时,客户端的查询配置请求会实时查询derby数据库。
服务端创建修改配置
可以看到在服务端创建和修改配置的时候会调用到ConfigController的publishConfig中进行配置的发布
@PostMapping@Secured(action = ActionTypes.WRITE, signType = SignType.CONFIG)public Boolean publishConfig(HttpServletRequest request, @RequestParam(value = "dataId") String dataId,@RequestParam(value = "group") String group,@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,@RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,@RequestParam(value = "appName", required = false) String appName,@RequestParam(value = "src_user", required = false) String srcUser,@RequestParam(value = "config_tags", required = false) String configTags,@RequestParam(value = "desc", required = false) String desc,@RequestParam(value = "use", required = false) String use,@RequestParam(value = "effect", required = false) String effect,@RequestParam(value = "type", required = false) String type,@RequestParam(value = "schema", required = false) String schema) throws NacosException {......// 目标灰度机器的IP地址。String betaIps = request.getHeader("betaIps");//构建配置信息ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);configInfo.setType(type);String encryptedDataKey = pair.getFirst();configInfo.setEncryptedDataKey(encryptedDataKey);//如果没配置灰度地址if (StringUtils.isBlank(betaIps)) {//没有配置标签if (StringUtils.isBlank(tag)) {//添加配置信息persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);//发布一个配置变更事件ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));} else {persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));}} else {// 灰度发布configInfo.setEncryptedDataKey(encryptedDataKey);persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));}//持久化日志ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(),InetUtils.getSelfIP(), ConfigTraceService.PERSISTENCE_EVENT_PUB, content);return true; }
PersistService有两个实现类一个是EmbeddedStoragePersistServiceImpl(嵌入式数据源derby)
一个ExternalStoragePersistServiceImpl(外部数据源) 而我使用的是mysql做的配置的持久化 所以用到的是ExternalStoragePersistServiceImpl 将数据写到mysql 的config_info表里
@Overridepublic void insertOrUpdate(String srcIp, String srcUser, ConfigInfo configInfo, Timestamp time,Map<String, Object> configAdvanceInfo, boolean notify) {try {//添加配置addConfigInfo(srcIp, srcUser, configInfo, time, configAdvanceInfo, notify);} catch (DataIntegrityViolationException ive) { // 2. 唯一约束冲突,更新updateConfigInfo(configInfo, srcIp, srcUser, time, configAdvanceInfo, notify);}}
将数据持久化
addConfigInfo尝试插入在一个事务里保存了config_info、config_tags_relation、his_config_info。
@Overridepublic void addConfigInfo(final String srcIp, final String srcUser, final ConfigInfo configInfo,final Timestamp time, final Map<String, Object> configAdvanceInfo, final boolean notify) {//tjc 就是TransactionTemplate 这里其实就是做的mysql的数据写入boolean result = tjt.execute(status -> {try {// 1. 保存config_infolong configId = addConfigInfoAtomic(-1, srcIp, srcUser, configInfo, time, configAdvanceInfo);// 2. 保存config_tags_relationString configTags = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("config_tags");addConfigTagsRelation(configId, configTags, configInfo.getDataId(), configInfo.getGroup(),configInfo.getTenant());/ 3. 记录日志his_config_infoinsertConfigHistoryAtomic(0, configInfo, srcIp, srcUser, time, "I");} catch (CannotGetJdbcConnectionException e) {LogUtil.FATAL_LOG.error("[db-error] " + e.toString(), e);throw e;}return Boolean.TRUE;});}
当发生了唯一索引冲突的时候会去修改配置信息
@Overridepublic void updateConfigInfo(final ConfigInfo configInfo, final String srcIp, final String srcUser,final Timestamp time, final Map<String, Object> configAdvanceInfo, final boolean notify) {boolean result = tjt.execute(status -> {try {// 1. 查询config_infoConfigInfo oldConfigInfo = findConfigInfo(configInfo.getDataId(), configInfo.getGroup(),configInfo.getTenant());String appNameTmp = oldConfigInfo.getAppName();/*If the appName passed by the user is not empty, use the persistent user's appName,otherwise use db; when emptying appName, you need to pass an empty string*/if (configInfo.getAppName() == null) {configInfo.setAppName(appNameTmp);}// 2. 更新config_infoupdateConfigInfoAtomic(configInfo, srcIp, srcUser, time, configAdvanceInfo);String configTags = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("config_tags");if (configTags != null) {// 3. 删除所有老tag config_tags_relationremoveTagByIdAtomic(oldConfigInfo.getId());// 4. 新增tag config_tags_relationaddConfigTagsRelation(oldConfigInfo.getId(), configTags, configInfo.getDataId(),configInfo.getGroup(), configInfo.getTenant());}// 5. 记录日志insertConfigHistoryAtomic(oldConfigInfo.getId(), oldConfigInfo, srcIp, srcUser, time, "U");} catch (CannotGetJdbcConnectionException e) {LogUtil.FATAL_LOG.error("[db-error] " + e.toString(), e);throw e;}return Boolean.TRUE;});}
数据变更事件发布
当数据持久化之后会去发送一个配置数据变更的ConfigDataChangeEvent事件
AsyncNotifyService会监听到ConfigDataChangeEvent事件来进行处理
@Autowiredpublic AsyncNotifyService(ServerMemberManager memberManager) {this.memberManager = memberManager;// 将ConfigDataChangeEvent注册到NotifyCentre。NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);// 注册订阅服务器以订阅ConfigDataChangeEvent。NotifyCenter.registerSubscriber(new Subscriber() {@Overridepublic void onEvent(Event event) {// 如果是配置变更事件if (event instanceof ConfigDataChangeEvent) {ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;long dumpTs = evt.lastModifiedTs;String dataId = evt.dataId;String group = evt.group;String tenant = evt.tenant;String tag = evt.tag;//获取集群虽有节点列表Collection<Member> ipList = memberManager.allMembers();// 每个节点组成一个TaskQueue<NotifySingleTask> httpQueue = new LinkedList<NotifySingleTask>();Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<NotifySingleRpcTask>();//遍历集群节点信息 for (Member member : ipList) {//判断是否是长轮询if (!MemberUtil.isSupportedLongCon(member)) {// 添加一个长轮询的异步dump任务httpQueue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),evt.isBeta));} else {// 添加一个长连接的异步dump任务rpcQueue.add(new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member));}}// 判断并执行长轮询的异步dump任务if (!httpQueue.isEmpty()) {ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue));}// 判断并执行长连接的异步dump任务if (!rpcQueue.isEmpty()) {ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));}}}@Overridepublic Class<? extends Event> subscribeType() {return ConfigDataChangeEvent.class;}});}
在接收到ConfigDataChangeEvent之后,如果Nacos2.0以上的版本,会创建一个RpcTask用以执行配置变更的通知,由内部类AsyncRpcTask执行,AsyncRpcTask具体逻辑如下所示。
class AsyncRpcTask implements Runnable {private Queue<NotifySingleRpcTask> queue;public AsyncRpcTask(Queue<NotifySingleRpcTask> queue) {this.queue = queue;}@Overridepublic void run() {while (!queue.isEmpty()) {NotifySingleRpcTask task = queue.poll();// 创建配置变更请求ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest();syncRequest.setDataId(task.getDataId());syncRequest.setGroup(task.getGroup());syncRequest.setBeta(task.isBeta);syncRequest.setLastModified(task.getLastModified());syncRequest.setTag(task.tag);syncRequest.setTenant(task.getTenant());Member member = task.member;// 如果是自身的数据变更,直接执行dump操作if (memberManager.getSelf().equals(member)) {if (syncRequest.isBeta()) {// 同步Beta配置dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),syncRequest.getLastModified(), NetUtils.localIP(), true);} else {// 像连接自己节点的client端进行配置信息的推送dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP());}continue;}// 通知集群其他节点进行dumpif (memberManager.hasMember(member.getAddress())) {// start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notifyboolean unHealthNeedDelay = memberManager.isUnHealth(member.getAddress());if (unHealthNeedDelay) {// target ip is unhealthy, then put it in the notification listConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,0, member.getAddress());// get delay time and set fail count to the taskasyncTaskExecute(task);} else {if (!MemberUtil.isSupportedLongCon(member)) {asyncTaskExecute(new NotifySingleTask(task.getDataId(), task.getGroup(), task.getTenant(), task.tag,task.getLastModified(), member.getAddress(), task.isBeta));} else {try {configClusterRpcClientProxy.syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task));} catch (Exception e) {MetricsMonitor.getConfigNotifyException().increment();asyncTaskExecute(task);}}}} else {//No nothig if member has offline.}}}}
向连接自己的Client端发送配置变更通知
这里首先创建了一个ConfigChangeClusterSyncRequest,并将配置信息写入。然后获取集群信息,通知相应的Server处理的数据同步请求。同步配置变更信息的核心逻辑由DumpService来执行。我们主要查看同步正式配置的操作,DumpService的dump方法如下所示。
public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,boolean isBeta) {//dataId+分组 String groupKey = GroupKey2.getKey(dataId, group, tenant);//dataId+分组+是否灰度发布+标签String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);//添加dump任务dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);}
在该方法中,这里会根据配置变更信息,提交一个异步的DumpTask任务,后续会由DumpProcessor类的process方法进行处理
DumpProcessor执行dump任务
public class DumpProcessor implements NacosTaskProcessor {final DumpService dumpService;public DumpProcessor(DumpService dumpService) {this.dumpService = dumpService;}@Overridepublic boolean process(NacosTask task) {final PersistService persistService = dumpService.getPersistService();DumpTask dumpTask = (DumpTask) task;String[] pair = GroupKey2.parseKey(dumpTask.getGroupKey());String dataId = pair[0];String group = pair[1];String tenant = pair[2];long lastModified = dumpTask.getLastModified();String handleIp = dumpTask.getHandleIp();boolean isBeta = dumpTask.isBeta();String tag = dumpTask.getTag();//构建 ConfigDumpEventConfigDumpEvent.ConfigDumpEventBuilder build = ConfigDumpEvent.builder().namespaceId(tenant).dataId(dataId).group(group).isBeta(isBeta).tag(tag).lastModifiedTs(lastModified).handleIp(handleIp);//如果是灰度发布if (isBeta) {// if publish beta, then dump config, update beta cacheConfigInfo4Beta cf = persistService.findConfigInfo4Beta(dataId, group, tenant);build.remove(Objects.isNull(cf));build.betaIps(Objects.isNull(cf) ? null : cf.getBetaIps());build.content(Objects.isNull(cf) ? null : cf.getContent());build.encryptedDataKey(Objects.isNull(cf) ? null : cf.getEncryptedDataKey());return DumpConfigHandler.configDump(build.build());}//判断是否有标签 if (StringUtils.isBlank(tag)) {// 1. 查询当前配置ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant);build.remove(Objects.isNull(cf));// 2. 设置ConfigDumpEvent的content为最新的contentbuild.content(Objects.isNull(cf) ? null : cf.getContent());build.type(Objects.isNull(cf) ? null : cf.getType());build.encryptedDataKey(Objects.isNull(cf) ? null : cf.getEncryptedDataKey());} else {ConfigInfo4Tag cf = persistService.findConfigInfo4Tag(dataId, group, tenant, tag);build.remove(Objects.isNull(cf));build.content(Objects.isNull(cf) ? null : cf.getContent());}// 3. 执行ConfigDumpEvent处理return DumpConfigHandler.configDump(build.build());}
}
public static boolean configDump(ConfigDumpEvent event) {final String dataId = event.getDataId();final String group = event.getGroup();final String namespaceId = event.getNamespaceId();final String content = event.getContent();final String type = event.getType();final long lastModified = event.getLastModifiedTs();final String encryptedDataKey = event.getEncryptedDataKey();......//真正执行dump请求的地方result = ConfigCacheService.dump(dataId, group, namespaceId, content, lastModified, type, encryptedDataKey);......}
/*** Save config file and update md5 value in cache.** @param dataId dataId string value.* @param group group string value.* @param tenant tenant string value.* @param content content string value.* @param lastModifiedTs lastModifiedTs.* @param type file type.* @return dumpChange success or not.*/public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,String type, String encryptedDataKey) {String groupKey = GroupKey2.getKey(dataId, group, tenant);CacheItem ci = makeSure(groupKey, encryptedDataKey, false);ci.setType(type);final int lockResult = tryWriteLock(groupKey);assert (lockResult != 0);if (lockResult < 0) {DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);return false;}try {//获取md5final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);if (lastModifiedTs < ConfigCacheService.getLastModifiedTs(groupKey)) {DUMP_LOG.warn("[dump-ignore] the content is old. groupKey={}, md5={}, lastModifiedOld={}, "+ "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),lastModifiedTs);return true;}】if (md5.equals(ConfigCacheService.getContentMd5(groupKey)) && DiskUtil.targetFile(dataId, group, tenant).exists()) {DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "+ "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),lastModifiedTs);//若直接读取,则从持久层获取数据 } else if (!PropertyUtil.isDirectRead()) {//持久化数据DiskUtil.saveToDisk(dataId, group, tenant, content);}//更新md5值updateMd5(groupKey, md5, lastModifiedTs, encryptedDataKey);return true;} catch (IOException ioe) {DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe);if (ioe.getMessage() != null) {String errMsg = ioe.getMessage();if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN) || errMsg.contains(DISK_QUATA_EN)) {// Protect from disk full.FATAL_LOG.error("磁盘满自杀退出", ioe);System.exit(0);}}return false;} finally {releaseWriteLock(groupKey);}}
public static void updateMd5(String groupKey, String md5, long lastModifiedTs, String encryptedDataKey) {CacheItem cache = makeSure(groupKey, encryptedDataKey, false);if (cache.md5 == null || !cache.md5.equals(md5)) {cache.md5 = md5;cache.lastModifiedTs = lastModifiedTs;//发送本地数据变更事件NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));}}
RpcConfigChangeNotifier
@Overridepublic void onEvent(LocalDataChangeEvent event) {String groupKey = event.groupKey;boolean isBeta = event.isBeta;List<String> betaIps = event.betaIps;String[] strings = GroupKey.parseKey(groupKey);String dataId = strings[0];String group = strings[1];String tenant = strings.length > 2 ? strings[2] : "";String tag = event.tag;//配置数据变更configDataChanged(groupKey, dataId, group, tenant, isBeta, betaIps, tag);}
/*** adaptor to config module ,when server side config change ,invoke this method.** @param groupKey groupKey*/public void configDataChanged(String groupKey, String dataId, String group, String tenant, boolean isBeta,List<String> betaIps, String tag) {//获取注册的Client列表Set<String> listeners = configChangeListenContext.getListeners(groupKey);if (CollectionUtils.isEmpty(listeners)) {return;}int notifyClientCount = 0;//遍历client列表for (final String client : listeners) {//拿到grpc连接Connection connection = connectionManager.getConnection(client);if (connection == null) {continue;}ConnectionMeta metaInfo = connection.getMetaInfo();//beta ips check.String clientIp = metaInfo.getClientIp();String clientTag = metaInfo.getTag();if (isBeta && betaIps != null && !betaIps.contains(clientIp)) {continue;}//tag checkif (StringUtils.isNotBlank(tag) && !tag.equals(clientTag)) {continue;}//构建请求参数ConfigChangeNotifyRequest notifyRequest = ConfigChangeNotifyRequest.build(dataId, group, tenant);//构建推送任务RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequest, 50, client, clientIp, metaInfo.getAppName());//推送任务 向客户端发送变更通知push(rpcPushRetryTask);notifyClientCount++;}Loggers.REMOTE_PUSH.info("push [{}] clients ,groupKey=[{}]", notifyClientCount, groupKey);}
这里实际上也是一个异步执行的过程,推送任务RpcPushTask会被提交到ClientConfigNotifierServiceExecutor来计划执行,第一次会立即推送配置,即调用RpcPushTask的run方法,如果失败则延迟重试次数x2的秒数再次执行,直到超过重试次数,主动注销当前连接
private void push(RpcPushTask retryTask) {ConfigChangeNotifyRequest notifyRequest = retryTask.notifyRequest;// 判断是否重试次数达到限制if (retryTask.isOverTimes()) {Loggers.REMOTE_PUSH.warn("push callback retry fail over times .dataId={},group={},tenant={},clientId={},will unregister client.",notifyRequest.getDataId(), notifyRequest.getGroup(), notifyRequest.getTenant(),retryTask.connectionId);// 主动注销连接connectionManager.unregister(retryTask.connectionId);} else if (connectionManager.getConnection(retryTask.connectionId) != null) {// first time :delay 0s; sencond time:delay 2s ;third time :delay 4s// 尝试执行配置推送ConfigExecutor.getClientConfigNotifierServiceExecutor().schedule(retryTask, retryTask.tryTimes * 2, TimeUnit.SECONDS);} else {// client is already offline,ingnore task.}}
RpcPushTask
@Overridepublic void run() {tryTimes++;if (!tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH, connectionId, clientIp)) {push(this);} else {//推送任务rpcPushService.pushWithCallback(connectionId, notifyRequest, new AbstractPushCallBack(3000L) {@Overridepublic void onSuccess() {tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH_SUCCESS, connectionId, clientIp);}@Overridepublic void onFail(Throwable e) {tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH_FAIL, connectionId, clientIp);Loggers.REMOTE_PUSH.warn("Push fail", e);push(RpcPushTask.this);}}, ConfigExecutor.getClientConfigNotifierServiceExecutor());}}
客户端处理变更通知
ClientWorker
private void initRpcClientHandler(final RpcClient rpcClientInner) {rpcClientInner.registerServerRequestHandler((request) -> {if (request instanceof ConfigChangeNotifyRequest) {ConfigChangeNotifyRequest configChangeNotifyRequest = (ConfigChangeNotifyRequest) request;LOGGER.info("[{}] [server-push] config changed. dataId={}, group={},tenant={}",rpcClientInner.getName(), configChangeNotifyRequest.getDataId(),configChangeNotifyRequest.getGroup(), configChangeNotifyRequest.getTenant());String groupKey = GroupKey.getKeyTenant(configChangeNotifyRequest.getDataId(), configChangeNotifyRequest.getGroup(),configChangeNotifyRequest.getTenant());CacheData cacheData = cacheMap.get().get(groupKey);if (cacheData != null) {synchronized (cacheData) {cacheData.getLastModifiedTs().set(System.currentTimeMillis());cacheData.setSyncWithServer(false);//向阻塞队列中添加元素 触发长连接的执行notifyListenConfig();}}//返回客户端响应return new ConfigChangeNotifyResponse();}return null;});}
后续逻辑处理看客户端的逻辑
向集群节点推送通知
AsyncRpcTask
configClusterRpcClientProxy.syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task));
public void syncConfigChange(Member member, ConfigChangeClusterSyncRequest request, RequestCallBack callBack)throws NacosException {clusterRpcClientProxy.asyncRequest(member, request, callBack);}
public void asyncRequest(Member member, Request request, RequestCallBack callBack) throws NacosException {RpcClient client = RpcClientFactory.getClient(memberClientKey(member));if (client != null) {client.asyncRequest(request, callBack);} else {throw new NacosException(CLIENT_INVALID_PARAM, "No rpc client related to member: " + member);}}
ConfigChangeClusterSyncRequestHandler 处理ConfigChangeClusterSyncRequest
@TpsControl(pointName = "ClusterConfigChangeNotify")@Overridepublic ConfigChangeClusterSyncResponse handle(ConfigChangeClusterSyncRequest configChangeSyncRequest,RequestMeta meta) throws NacosException {//是否是灰度if (configChangeSyncRequest.isBeta()) {dumpService.dump(configChangeSyncRequest.getDataId(), configChangeSyncRequest.getGroup(),configChangeSyncRequest.getTenant(), configChangeSyncRequest.getLastModified(), meta.getClientIp(),true);} else {//向连接自己节点的 client端进行数据的同步dumpService.dump(configChangeSyncRequest.getDataId(), configChangeSyncRequest.getGroup(),configChangeSyncRequest.getTenant(), configChangeSyncRequest.getLastModified(), meta.getClientIp());}return new ConfigChangeClusterSyncResponse();}
服务端配置变更总结
1.当从服务端进行配置的新增和修改会先将数据给持久化到内嵌的数据源或者外部的比如mysql中
2.然后发送配置变更的事件会将配置通过GRPC协议同步给连接自己的cleint端 和连接集群中其他节点的客户端