文章目录
- 服务配置
- Nacos Config入门
- Nacos服务端配置发布源码
- Nacos 服务端监控源码
服务配置
服务配置中心介绍
首先我们来看一下,微服务架构下关于配置文件的一些问题:
- 配置文件相对分散。在一个微服务架构下,配置文件会随着微服务的增多变的越来越多,而且分散在各个微服务中,不好统一配置和管理。
- 配置文件无法区分环境。微服务项目可能会有多个环境,例如:测试环境、预发布环境、生产环境。每一个环境所使用的配置理论上都是不同的,一旦需要修改,就需要我们去各个微服务下手动维护,这比较困难。
- 配置文件无法实时更新。我们修改了配置文件之后,必须重新启动微服务才能使配置生效,这对一个正在运行的项目来说是非常不友好的。基于上面这些问题,我们就需要配置中心的加入来解决这些问题。
配置中心的思路是:
首先把项目中各种配置全部都放到一个集中的地方进行统一管理,并提供一套标准的接口。当各个服务需要获取配置的时候,就来配置中心的接口拉取自己的配置。当配置中心中的各种参数有更新的时候,也能通知到各个服务实时的过来同步最新的信息,使之动态更新。
Nacos Config入门
1.导入依赖
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId></dependency>
2.配置nacos-config
:1)不能使用原来的application.yml作为配置文件,而是新建一个bootstrap.yml作为配置文件;2)在bootstrap和application数据项相同时,bootstrap中的配置不会被覆盖;
配置文件优先级(由高到低):
bootstrap.properties -> bootstrap.yml -> application.properties -> application.yml
spring:application:name: service-namecloud:nacos:config:server-addr: localhost:8848 # nacos的服务端地址file-extension: yaml # 配置文件格式profiles:active: dev # 环境标识
3.自动装配
Nacos服务端配置发布源码
1.组装请求参数
@PostMapping@TpsControl(pointName = "ConfigPublish")@Secured(action = ActionTypes.WRITE, signType = SignType.CONFIG)public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,@RequestParam(value = "dataId") String dataId,@RequestParam(value = "group") String group,@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,@RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,@RequestParam(value = "appName", required = false) String appName,@RequestParam(value = "src_user", required = false) String srcUser,@RequestParam(value = "config_tags", required = false) String configTags,@RequestParam(value = "desc", required = false) String desc,@RequestParam(value = "use", required = false) String use,@RequestParam(value = "effect", required = false) String effect,@RequestParam(value = "type", required = false) String type,@RequestParam(value = "schema", required = false) String schema) throws NacosException {// //加密Pair<String, String> pair = EncryptionHandler.encryptHandler(dataId, content);content = pair.getSecond();// 参数校验ParamUtils.checkTenant(tenant);ParamUtils.checkParam(dataId, group, "datumId", content);ParamUtils.checkParam(tag);//组装请求参数ConfigForm configForm = new ConfigForm();configForm.setDataId(dataId);configForm.setGroup(group);configForm.setNamespaceId(tenant);configForm.setContent(content);configForm.setTag(tag);configForm.setAppName(appName);configForm.setSrcUser(srcUser);configForm.setConfigTags(configTags);configForm.setDesc(desc);configForm.setUse(use);configForm.setEffect(effect);configForm.setType(type);configForm.setSchema(schema);if (StringUtils.isBlank(srcUser)) {configForm.setSrcUser(RequestUtil.getSrcUserName(request));}if (!ConfigType.isValidType(type)) {configForm.setType(ConfigType.getDefaultType().getType());}ConfigRequestInfo configRequestInfo = new ConfigRequestInfo();configRequestInfo.setSrcIp(RequestUtil.getRemoteIp(request));configRequestInfo.setRequestIpApp(RequestUtil.getAppName(request));configRequestInfo.setBetaIps(request.getHeader("betaIps"));String encryptedDataKey = pair.getFirst();return configOperationService.publishConfig(configForm, configRequestInfo, encryptedDataKey);}
2.添加或者更新配置
*** Adds or updates non-aggregated data.** @throws NacosException NacosException.*/public Boolean publishConfig(ConfigForm configForm, ConfigRequestInfo configRequestInfo, String encryptedDataKey)throws NacosException {//配置信息转mapMap<String, Object> configAdvanceInfo = getConfigAdvanceInfo(configForm);//参数校验ParamUtils.checkParam(configAdvanceInfo);if (AggrWhitelist.isAggrDataId(configForm.getDataId())) {LOGGER.warn("[aggr-conflict] {} attempt to publish single data, {}, {}", configRequestInfo.getSrcIp(),configForm.getDataId(), configForm.getGroup());throw new NacosApiException(HttpStatus.FORBIDDEN.value(), ErrorCode.INVALID_DATA_ID,"dataId:" + configForm.getDataId() + " is aggr");}final Timestamp time = TimeUtils.getCurrentTime();// 构建ConfigInfo配置信息,发布配置最基本的五个参数: nameSpaceId、groupId、dataId、应用名称、配置内容ConfigInfo configInfo = new ConfigInfo(configForm.getDataId(), configForm.getGroup(), configForm.getNamespaceId(),configForm.getAppName(), configForm.getContent());configInfo.setType(configForm.getType());configInfo.setEncryptedDataKey(encryptedDataKey);// 判断是否是beta测试版本if (StringUtils.isBlank(configRequestInfo.getBetaIps())) {// 正常发布,大部分情况下,我们都没有指定tagif (StringUtils.isBlank(configForm.getTag())) {// 1、插入 or 更新配置信息// 这里分为内置数据库(EmbeddedConfigInfoPersistServiceImpl)和外置数据库(ExternalConfigInfoPersistServiceImpl)操作,通常我们都是使用MySQL进行持久化存储configInfoPersistService.insertOrUpdate(configRequestInfo.getSrcIp(), configForm.getSrcUser(),configInfo, time, configAdvanceInfo, false);ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, configForm.getDataId(), configForm.getGroup(),configForm.getNamespaceId(), time.getTime()));} else {configInfoTagPersistService.insertOrUpdateTag(configInfo, configForm.getTag(),configRequestInfo.getSrcIp(), configForm.getSrcUser(), time, false);ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, configForm.getDataId(), configForm.getGroup(),configForm.getNamespaceId(), configForm.getTag(), time.getTime()));}} else {// 数据插入或者更新configInfoBetaPersistService.insertOrUpdateBeta(configInfo, configRequestInfo.getBetaIps(),configRequestInfo.getSrcIp(), configForm.getSrcUser(), time, false);ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(true, configForm.getDataId(), configForm.getGroup(), configForm.getNamespaceId(),time.getTime()));}// 日志跟踪ConfigTraceService.logPersistenceEvent(configForm.getDataId(), configForm.getGroup(), configForm.getNamespaceId(),configRequestInfo.getRequestIpApp(), time.getTime(), InetUtils.getSelfIP(),ConfigTraceService.PERSISTENCE_EVENT_PUB, configForm.getContent());return true;}
public void insertOrUpdateBeta(final ConfigInfo configInfo, final String betaIps, final String srcIp,final String srcUser, final Timestamp time, final boolean notify) {// 没有直接判断是新增还是更新,而且依赖数据库唯一性做检查,重复了(报主键冲突,说明已存在)就做更新try {//往数据库中添加信息addConfigInfo4Beta(configInfo, betaIps, srcIp, null, time, notify);} catch (DataIntegrityViolationException ive) { // Unique constraint conflict//报错则更新信息updateConfigInfo4Beta(configInfo, betaIps, srcIp, null, time, notify);}}
public void addConfigInfo4Beta(ConfigInfo configInfo, String betaIps, String srcIp, String srcUser, Timestamp time,boolean notify) {String appNameTmp = StringUtils.isBlank(configInfo.getAppName()) ? StringUtils.EMPTY : configInfo.getAppName();String tenantTmp = StringUtils.isBlank(configInfo.getTenant()) ? StringUtils.EMPTY : configInfo.getTenant();String md5 = MD5Utils.md5Hex(configInfo.getContent(), Constants.ENCODE);String encryptedDataKey = StringUtils.isBlank(configInfo.getEncryptedDataKey()) ? StringUtils.EMPTY: configInfo.getEncryptedDataKey();try {ConfigInfoBetaMapper configInfoBetaMapper = mapperManager.findMapper(dataSourceService.getDataSourceType(),TableConstant.CONFIG_INFO_BETA);jt.update(configInfoBetaMapper.insert(Arrays.asList("data_id", "group_id", "tenant_id", "app_name", "content", "md5", "beta_ips","src_ip", "src_user", "gmt_create", "gmt_modified", "encrypted_data_key")),configInfo.getDataId(), configInfo.getGroup(), tenantTmp, appNameTmp, configInfo.getContent(), md5,betaIps, srcIp, srcUser, time, time, encryptedDataKey);} catch (CannotGetJdbcConnectionException e) {LogUtil.FATAL_LOG.error("[db-error] " + e, e);throw e;}}
public void addConfigInfo(final String srcIp, final String srcUser, final ConfigInfo configInfo,final Timestamp time, final Map<String, Object> configAdvanceInfo, final boolean notify) {tjt.execute(status -> {try {// jdbcTemplate操作,自动插入到数据库表(config_info)中,返回主键idlong configId = addConfigInfoAtomic(-1, srcIp, srcUser, configInfo, time, configAdvanceInfo);String configTags = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("config_tags");addConfigTagsRelation(configId, configTags, configInfo.getDataId(), configInfo.getGroup(),configInfo.getTenant());// 插入历史数据到表中(his_config_info)historyConfigInfoPersistService.insertConfigHistoryAtomic(0, configInfo, srcIp, srcUser, time, "I");} catch (CannotGetJdbcConnectionException e) {LogUtil.FATAL_LOG.error("[db-error] " + e, e);throw e;}return Boolean.TRUE;});}
addConfigInfoAtomic方法
public long addConfigInfoAtomic(final long configId, final String srcIp, final String srcUser,final ConfigInfo configInfo, Map<String, Object> configAdvanceInfo) {// 取出配置信息final String appNameTmp =StringUtils.isBlank(configInfo.getAppName()) ? StringUtils.EMPTY : configInfo.getAppName();final String tenantTmp =StringUtils.isBlank(configInfo.getTenant()) ? StringUtils.EMPTY : configInfo.getTenant();final String desc = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("desc");final String use = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("use");final String effect = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("effect");final String type = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("type");final String schema = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("schema");final String encryptedDataKey =configInfo.getEncryptedDataKey() == null ? StringUtils.EMPTY : configInfo.getEncryptedDataKey();// 将配置内容进行MD5加密final String md5Tmp = MD5Utils.md5Hex(configInfo.getContent(), Constants.ENCODE);KeyHolder keyHolder = new GeneratedKeyHolder();// 根据数据库表获取对应的mapper, 通过插件化的形式, 灵活应对使用不同数据库的场景ConfigInfoMapper configInfoMapper = mapperManager.findMapper(dataSourceService.getDataSourceType(),TableConstant.CONFIG_INFO);// 将参数转换成对应数据库类型的sql语句,拼接insert into config_info values(....)插入语句final String sql = configInfoMapper.insert(Arrays.asList("data_id", "group_id", "tenant_id", "app_name", "content", "md5", "src_ip", "src_user","gmt_create", "gmt_modified", "c_desc", "c_use", "effect", "type", "c_schema","encrypted_data_key"));// 获取主键名称,默认值为idString[] returnGeneratedKeys = configInfoMapper.getPrimaryKeyGeneratedKeys();try {jt.update(new PreparedStatementCreator() {@Overridepublic PreparedStatement createPreparedStatement(Connection connection) throws SQLException {Timestamp now = new Timestamp(System.currentTimeMillis());// 通过预编译的PreparedStatement,设置每个字段的值PreparedStatement ps = connection.prepareStatement(sql, returnGeneratedKeys);ps.setString(1, configInfo.getDataId());ps.setString(2, configInfo.getGroup());ps.setString(3, tenantTmp);ps.setString(4, appNameTmp);ps.setString(5, configInfo.getContent());ps.setString(6, md5Tmp);ps.setString(7, srcIp);ps.setString(8, srcUser);ps.setTimestamp(9, now);ps.setTimestamp(10, now);ps.setString(11, desc);ps.setString(12, use);ps.setString(13, effect);ps.setString(14, type);ps.setString(15, schema);ps.setString(16, encryptedDataKey);return ps;}}, keyHolder);Number nu = keyHolder.getKey();if (nu == null) {throw new IllegalArgumentException("insert config_info fail");}return nu.longValue();} catch (CannotGetJdbcConnectionException e) {LogUtil.FATAL_LOG.error("[db-error] " + e, e);throw e;}
}
Nacos 服务端监控源码
1.客户端进行长轮询其实是使用定时线程来定时调用/v1/cs/configs/listener接口实现
路径 Nacos-config服务模块下的ConfigController.java
inner.doPollingConfig执行长轮询请求
@PostMapping("/listener")@Secured(action = ActionTypes.READ, signType = SignType.CONFIG)public void listener(HttpServletRequest request, HttpServletResponse response)throws ServletException, IOException {request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);String probeModify = request.getParameter("Listening-Configs");if (StringUtils.isBlank(probeModify)) {LOGGER.warn("invalid probeModify is blank");throw new IllegalArgumentException("invalid probeModify");}//获取客户端需要监听的可能发送变化的配置,计算MD5值probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);Map<String, String> clientMd5Map;try {clientMd5Map = MD5Util.getClientMd5Map(probeModify);} catch (Throwable e) {throw new IllegalArgumentException("invalid probeModify");}// 发送长轮训inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());}
2.发送长轮询请求
/*** long polling the config.*/public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {// 判断当前请求是否为长轮询,如果是,调用LongPollingService的addLongPollingClient()方法if (LongPollingService.isSupportLongPolling(request)) {longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);return HttpServletResponse.SC_OK + "";}//如果不是长轮训,就直接返回结果// Compatible with short polling logic.List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);// Compatible with short polling result.String oldResult = MD5Util.compareMd5OldResult(changedGroups);String newResult = MD5Util.compareMd5ResultString(changedGroups);String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);if (version == null) {version = "2.0.0";}int versionNum = Protocol.getVersionNumber(version);// Before 2.0.4 version, return value is put into header.if (versionNum < START_LONG_POLLING_VERSION_NUM) {response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);} else {request.setAttribute("content", newResult);}// Disable cache.response.setHeader("Pragma", "no-cache");response.setDateHeader("Expires", 0);response.setHeader("Cache-Control", "no-cache,no-store");response.setStatus(HttpServletResponse.SC_OK);return HttpServletResponse.SC_OK + "";}
通过scheduler.schedule启动一个定时任务,并延时时间为29.5s
将ClientLongPolling实例本身添加到allSubs队列中,它主要维护一个长轮询的订阅关系。
定时任务执行后,先把ClientLongPolling实例本身从allSubs队列中移除。
通过MD5比较客户端请求的groupKeys是否发生变更,并将变更结果通过response返回给客户端
所谓长轮询就是服务端收到请求之后,不立即返回,而是在延29.5s才把请求结果返回给客户端,这使得客户端和服务端之间在30s之内数据没有发生变化的情况下一直处于连接状态。
/*** Add LongPollingClient.** @param req HttpServletRequest.* @param rsp HttpServletResponse.* @param clientMd5Map clientMd5Map.* @param probeRequestSize probeRequestSize.*/public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,int probeRequestSize) {String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);//获取客户端请求的超时时间,减去500ms后赋值给timeout变量。int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);// Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.long timeout = -1L;//判断isFixedPolling,如果为true,定时任务将会在30s后开始执行,否则在29.5s后开始执行if (isFixedPolling()) {timeout = Math.max(10000, getFixedPollingInterval());// Do nothing but set fix polling timeout.} else {timeout = Math.max(10000, Long.parseLong(str) - delayTime);long start = System.currentTimeMillis();List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);//和服务端的数据进行MD5对比,如果发送变化则直接返回if (changedGroups.size() > 0) {generateResponse(req, rsp, changedGroups);LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,changedGroups.size());return;} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,changedGroups.size());return;}}String ip = RequestUtil.getRemoteIp(req);ConnectionCheckResponse connectionCheckResponse = checkLimit(req);if (!connectionCheckResponse.isSuccess()) {generate503Response(req, rsp, connectionCheckResponse.getMessage());return;}// Must be called by http thread, or send response.final AsyncContext asyncContext = req.startAsync();// AsyncContext.setTimeout() is incorrect, Control by oneselfasyncContext.setTimeout(0L);String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);String tag = req.getHeader("Vipserver-Tag");//scheduler.execute 执行ClientLongPolling线程ConfigExecutor.executeLongPolling(new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));}
@Override
public void run() {asyncTimeoutFuture = scheduler.schedule(new Runnable() {@Overridepublic void run() {try {//将 ClientLongPolling 实例本身添加到 allSubs 队列中,它主要维护一个长轮询的订阅关系getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());//定时任务执行后,先把 ClientLongPolling 实例本身从 allSubs 队列中移除allSubs.remove(ClientLongPolling.this);//判断是否为固定轮询if (isFixedPolling()) {LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",(System.currentTimeMillis() - createTime),"fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),"polling",clientMd5Map.size(), probeRequestSize);//比较数据的 MD5 值,判断是否发生变更List<String> changedGroups = MD5Util.compareMd5((HttpServletRequest)asyncContext.getRequest(),(HttpServletResponse)asyncContext.getResponse(), clientMd5Map);//并将变更的结果通过response返回给客户端if (changedGroups.size() > 0) {sendResponse(changedGroups);} else {sendResponse(null);}} else {LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",(System.currentTimeMillis() - createTime),"timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),"polling",clientMd5Map.size(), probeRequestSize);sendResponse(null);}} catch (Throwable t) {LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause());}}}, timeoutTime, TimeUnit.MILLISECONDS);allSubs.add(this);
}