一、引言
上个章节我们简单学习和使用了下Nacos服务自动注册,本文就来分析下Nacos客户端自动注册服务是怎么实现的~
二、目录
目录
一、引言
三、Nacos 源码编译
1.1 拉取代码
1.2 运行起来
四、客户端使用版本选择
五、Nacos客户端项目启动为什么会自动注册服务?
六、this.serviceRegistry 属性怎么被赋值的 ?
七、Nacos 客户端通过什么方式注册服务 ?
八、Nacos 客户端怎么发送服务心跳 ?
总结:
三、Nacos 源码编译
1.1 拉取代码
git地址:GitHub - alibaba/nacos at 1.4.1
a.这里我们选择1.4.1版本的Nacos进行下载,后面也是根据这个版本来进行学习的
b.直接git拉取或者下载zip的压缩包都可以
1.2 运行起来
a.拉下来的项目结构大概就是这个样子
b.我们试着运行起来~
Nacos的启动类在console模块当中,这里我们要给启动类配置成单机启动,方便我们学习测试·
/*** Nacos starter.** @author nacos*/
@SpringBootApplication(scanBasePackages = "com.alibaba.nacos")
@ServletComponentScan
@EnableScheduling
public class Nacos {public static void main(String[] args) {SpringApplication.run(Nacos.class, args);}
}
单机启动命令:
-Dnacos.standalone=true
配置启动类:
成功运行:
1.3 测试一下
a.注册地址的ip改为本地的,然后尝试启动起来,看看有没有成功注册到本地Nacos当中~
b.可以看到,order-service服务已经注册进去了
四、客户端使用版本选择
Nacos 服务端使用什么版本,其他组件也有对应使用版本要求。在实际项目使用过程中,遇到版本不一致这种问题太搞事情了,所以一开始在组件选型的时候,我们得先了解一下版本的选择。如下图表:
这里我们 Nacos 使用的版本是 1.4.1,那么对应Spring Cloud Alibaba的版本至少是2.2.4 Release,如下:
<properties><java.version>1.8</java.version><spring-cloud-alibaba.version>2.2.5.RELEASE</spring-cloud-alibaba.version>
</properties>
五、Nacos客户端项目启动为什么会自动注册服务?
首先我们查看源码前先确定我们的主线任务是:Nacos客户端项目启动为什么会自动注册服务
我们知道项目是引入了spring-cloud-starter-alibaba-nacos-discovery依赖,项目才有了注册服务的功能,那我们就从spring-cloud-starter-alibaba-nacos-discovery依赖的jar包分析,可以看到依赖中有个spring.factories文件,里面都是一些Configuration类。
说明:spring.factories 这个文件是在 Spring Boot 启动时,会扫描到这个文件,从而去创建里面的配置类。
我们看spring.factories文件里都是一些nacos 客户端相关的注册配置类,具体哪个是注册的类,这个时候我们只能去靠猜,然后去验证。NacosServiceRegistryAutoConfiguration 这个起的名字看着就很像,service服务,registry是注册的意思,auto自动,那连起来不就是Nacos 自动注册服务类。当然呀,这也只是我的猜测,接着跟我往下看就知道我猜的对不对了。
NacosServiceRegistryAutoConfiguration 类创建了三个bean对象。
- NacosServiceRegistry
- NacosRegistration
- NacosAutoServiceRegistration
我们先来看第一个bean对象,NacosServiceRegistry:
@Bean
public NacosServiceRegistry nacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {return new NacosServiceRegistry(nacosDiscoveryProperties);
}
我们先来看一下这个 nacosServiceRegistry 方法的入参,NacosDiscoveryProperties,这一看不就是注册服务的yml配置类。
说明:ConfigurationProperties 映射配置文件中相对应的属性
我们再来看一下 NacosServiceRegistry 类的内容,可以看到有个register 方法,起的名字和内容看着是有点像 真正的注册方法 的。这个register 方法内容大概看了一下,获取一些参数,比如:namingService、serviceId、group、instance,最后调用了namingService.registerInstance()方法。看到这我们就不用往下细看了,别忘了我们这次的主线任务:Nacos客户端为什么启动项目能发起自动注册。
紧接着我们看第二个bean对象,NacosRegistration不正是第刚才register方法的入参嘛。看这个方法的名字感觉跟我们这次主线任务关系不大,直接跳过。
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosRegistration nacosRegistration(ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers,NacosDiscoveryProperties nacosDiscoveryProperties,ApplicationContext context) {return new NacosRegistration(registrationCustomizers.getIfAvailable(),nacosDiscoveryProperties, context);
}
然后我们看第三个bean对象,NacosAutoServiceRegistration,这个名字就很像Nacos自动注册服务了。我们看一下这个方法的入参包含前两个bean的对象,NacosServiceRegistry和NacosRegistration,这样的话一看这个方法就很重要了。
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(NacosServiceRegistry registry,AutoServiceRegistrationProperties autoServiceRegistrationProperties,NacosRegistration registration) {return new NacosAutoServiceRegistration(registry,autoServiceRegistrationProperties, registration);
}
我们再来看看方法的内容,大概就是通过构造方法往父类传值,这个我们下一小节再说。接着再往下面看又有一个 register(注册)的方法。
public NacosAutoServiceRegistration(ServiceRegistry<Registration> serviceRegistry,AutoServiceRegistrationProperties autoServiceRegistrationProperties,NacosRegistration registration) {super(serviceRegistry, autoServiceRegistrationProperties);this.registration = registration;
} @Override
protected void register() {if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {log.debug("Registration disabled.");return;}if (this.registration.getPort() < 0) {this.registration.setPort(getPort().get());}super.register();
}
看这个方法的内容不像是真正执行注册的方法,我们看下这个 register 方法哪里调用了,一共两处,第二处是在本方法内调用的父类方法,所以直接看第一处调用。
start这个方法名就感觉像是找到关键注册方法的点了,在看下这个方法哪里调用了。
start方法第一处调用,是一个restart重启方法调用的,很明显不是。
start方法第二处调用,是 bind 方法调用的,bind方法又被onApplicationEvent方法调用的。
看到这里,大概就知道了。很明显是利用 Spirng 监听的事件,在Spring 容器启动的最后 执行 finishRefresh 方法,然后会发布一个事件。这样一说,估计有很多小伙伴不明白,下面我写了个案例演示一下:
/*** @Author WangYan* @Date 2024/3/22 09:57* @Version 1.0* 在 Spring 容器启动的时候,就会发布事件,这里就可以监听到,从而执行我们的代码**/
@Data
@Component
public class Test01 implements ApplicationListener<WebServerInitializedEvent> {@Overridepublic void onApplicationEvent(WebServerInitializedEvent event) {System.out.println("监听事件开始执行~");}
}
我在上面自定义了一个监听器代码,在Spring 容器启动的时候发布事件,这里就会监听到,从而执行我们这里的代码。
那我们的主线任务:Nacos客户端项目启动为什么会自动注册服务 ?
看到这就很明了了,就是利用了 Spring 事件监听,在监听中执行了最终执行了 register 方法,进行服务注册的。
六、this.serviceRegistry 属性怎么被赋值的 ?
首先Spring 监听事件启动,先去调用 bind 方法 ---> start 方法 ---> register 方法。最终在 register 方法当中执行了第一个bean “NacosServiceRegistry”类中对应的注册方法。
那我们来分析一下 start 方法当中register 方法里面的内容,是直接调用了serviceRegistry的注册方法,那这个属性是在什么时候进行赋值的呢 ?
protected void register() {this.serviceRegistry.register(getRegistration());
}
我们在创建第三个bean对象的时候看看需要哪些参数:
- 参数一:NacosServiceRegistry 对应创建第一个bean对象
- 参数二:AutoServiceRegistrationProperties
- 参数三:NacosRegistration 对应创建第二个bean对象
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(NacosServiceRegistry registry,AutoServiceRegistrationProperties autoServiceRegistrationProperties,NacosRegistration registration) {return new NacosAutoServiceRegistration(registry,autoServiceRegistrationProperties, registration);
}
就是第一个bean对象传进来就是serviceRegistry属性,通过父类的构造方法进行传值,最后再给ServiceRegistry serviceRegistry; 进行赋值
public NacosAutoServiceRegistration(ServiceRegistry<Registration> serviceRegistry,AutoServiceRegistrationProperties autoServiceRegistrationProperties,NacosRegistration registration) {// 调用父类的构造方法进行传值super(serviceRegistry, autoServiceRegistrationProperties);this.registration = registration;
}// 父类构造方法
protected AbstractAutoServiceRegistration(ServiceRegistry<R> serviceRegistry,AutoServiceRegistrationProperties properties) {// 给ServiceRegistry<R> serviceRegistry; 进行赋值this.serviceRegistry = serviceRegistry;this.properties = properties;
}
是不是看看得有点懵逼,画个图理一理就清晰了:
整个流程图小节:
- 通过spring.factories文件,创建相关配置类,NacosServiceRegistryAutoConfiguration类里面定义了三个bean对象
- 创建第三个bean对象时,需要第一个和第二个bean对象,作为参数传入。第一个bean对象当中就包含了真正的注册方法。并且赋值给了第三个bean对象父类中的this.serviceRegistry属性。
- 第三个bean对象的父类实现了监听方法。当Spring 容器启动的时候,发布事件,这个时候就会监听到,从而执行 注册服务的逻辑。
七、Nacos 客户端通过什么方式注册服务 ?
回顾前两张小节:Spring容器启动的时候,监听事件就会去执行 register 方法,那 register方法是怎么实现的呢 ?
本节主线任务:Nacos 客户端底层通过什么方式注册服务的 ? Http ?还是Sofa ?接下来我们一起探索下
那我们就要回顾下创建第一个bean对象,”NacosServiceRegistry“,里面就包含了 register 注册方法。给大家 Debug 看下,证明我们没有分析错误,很明显,最后注册的逻辑走到这个里面来了。
接下来我们看下这个方法的大概内容:
@Override
public void register(Registration registration) {// serviceId入参校验if (StringUtils.isEmpty(registration.getServiceId())) {log.warn("No service to register for nacos client...");return;}// 获取NamingServiceNamingService namingService = namingService();// 服务idString serviceId = registration.getServiceId();// 分组名称String group = nacosDiscoveryProperties.getGroup();// 这个Instance里面包含了ip和port,比较重要的点Instance instance = getNacosInstanceFromRegistration(registration);try {// 上面的话都是分支代码,跟我们本次主线任务无关,可以先了解下就行,后面再细看// serviceId, group, instance都传进了namingService.registerInstance()这方法,很明显 // namingService.registerInstance() 就是本次的主线任务主要方法namingService.registerInstance(serviceId, group, instance);log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,instance.getIp(), instance.getPort());}catch (Exception e) {log.error("nacos registry, {} register failed...{},", serviceId,registration.toString(), e);// rethrow a RuntimeException if the registration is failed.// issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132rethrowRuntimeException(e);}
}
接下来调用了这个方法NamingService类中 registerInstance接口,并且只有NacosNamingService一个实现类。
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {// instance参数校验NamingUtils.checkInstanceIsLegal(instance);// serviceName和groupName拼装成指定的字符串,比如:groupName@@serviceNameString groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);// 判断为 ephemeral 为true才执行,默认就为 trueif (instance.isEphemeral()) {// 构建心跳信息,拼装参数BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);// 添加心跳信息beatReactor.addBeatInfo(groupedServiceName, beatInfo);}// 上面都是分支代码,有个印象就行,跟本次主线任务无关// 很明显 serverProxy.registerService() 这个方法里面有注册服务的逻辑serverProxy.registerService(groupedServiceName, groupName, instance);
}
紧接着我们来到了第三个调用 serverProxy.registerService() ,分析下方法内容:
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,instance);// 拼装Map组成参数final Map<String, String> params = new HashMap<String, String>(16);params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, serviceName);params.put(CommonParams.GROUP_NAME, groupName);params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());// ip、port就是instance里传过来的params.put("ip", instance.getIp());params.put("port", String.valueOf(instance.getPort()));params.put("weight", String.valueOf(instance.getWeight()));params.put("enable", String.valueOf(instance.isEnabled()));params.put("healthy", String.valueOf(instance.isHealthy()));params.put("ephemeral", String.valueOf(instance.isEphemeral()));params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));// 请求地址、参数、请求方式 ,这里很明显就是HTPP的请求方式了// UtilAndComs.nacosUrlInstance 拼装结果: /nacos/v1/ns/instancereqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
}
当然上面也只是我们的猜测,接下来我们验证下对不对:
官方 Open API 文档截图如下:
可以看到注册实例的请求类型和请求路径,跟我们上面的分析出来的方法一样。就是HTTP请求
看到这本节的主线任务完成了:
- 主线任务:Nacos 客户端底层通过什么方式注册服务的 ?
- 结果:HTTP的请求方式
八、Nacos 客户端怎么发送服务心跳 ?
主线任务:Nacos 客户端怎么发送服务心跳?
现在我们来看下服务心跳的代码块,总块两个部分,一个构建心跳信息,一个去添加发送心跳
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {NamingUtils.checkInstanceIsLegal(instance);String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);// 判断为 ephemeral 为true才执行,默认就为 trueif (instance.isEphemeral()) {// 构建心跳信息,拼装参数BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);// 主线任务:添加心跳信息beatReactor.addBeatInfo(groupedServiceName, beatInfo);}serverProxy.registerService(groupedServiceName, groupName, instance);
}
第一部分:我们先来看下构建心跳信息的代码,很简单主要是拼装BeatInfo对象参数。
public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {// 拼装BeatInfo对象参数BeatInfo beatInfo = new BeatInfo();beatInfo.setServiceName(groupedServiceName);beatInfo.setIp(instance.getIp());beatInfo.setPort(instance.getPort());beatInfo.setCluster(instance.getClusterName());beatInfo.setWeight(instance.getWeight());beatInfo.setMetadata(instance.getMetadata());beatInfo.setScheduled(false);// 这个还是有点重要的,period 字段:发送健康检查的间隔秒数beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());return beatInfo;
}
这里来重点说明一下period这个属性的取值方法:getInstanceHeartBeatInterval()
public long getInstanceHeartBeatInterval() {// 先去获取 PreservedMetadataKeys.HEART_BEAT_INTERVAL 常量的值,为空的话默认返回 5000return getMetaDataByKeyWithDefault(PreservedMetadataKeys.HEART_BEAT_INTERVAL,Constants.DEFAULT_HEART_BEAT_INTERVAL);
}private long getMetaDataByKeyWithDefault(final String key, final long defaultValue) {if (getMetadata() == null || getMetadata().isEmpty()) {return defaultValue;}final String value = getMetadata().get(key);if (!StringUtils.isEmpty(value) && value.matches(NUMBER_PATTERN)) {return Long.parseLong(value);}return defaultValue;
}
第二部分:
构建心跳信息的过完了,接着看下addBeatInfo() 发送健康心跳检查的代码内容。这里executorService是个线程池执行的任务,那么逻辑一定在run方法当中。
注意:executorService线程池上面代码设置成了守护线程,只要不退出当前jvm,用于发送心跳检查的线程会一直执行
public BeatReactor(NamingProxy serverProxy, int threadCount) {this.serverProxy = serverProxy;this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);// 设置成守护线程thread.setDaemon(true);thread.setName("com.alibaba.nacos.naming.beat.sender");return thread;}});
}public void addBeatInfo(String serviceName, BeatInfo beatInfo) {NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());BeatInfo existBeat = null;//fix #1733if ((existBeat = dom2Beat.remove(key)) != null) {existBeat.setStopped(true);}dom2Beat.put(key, beatInfo);// 上面都是分支代码,主线任务:executorService.schedule() 发送健康心跳检查executorService.schedule(new BeatReactor.BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
接下来我们看下BeatReactor类的代码:
class BeatTask implements Runnable {BeatInfo beatInfo;public BeatTask(BeatInfo beatInfo) {this.beatInfo = beatInfo;}@Overridepublic void run() {// 判断是否停止if (beatInfo.isStopped()) {return;}// 获取下一次执行的时间,同样还是5slong nextTime = beatInfo.getPeriod();try {// 主线任务:发送健康心跳检查JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);long interval = result.get("clientBeatInterval").asLong();boolean lightBeatEnabled = false;if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();}BeatReactor.this.lightBeatEnabled = lightBeatEnabled;if (interval > 0) {nextTime = interval;}// 获取服务端返回的code状态码int code = NamingResponseCode.OK;if (result.has(CommonParams.CODE)) {code = result.get(CommonParams.CODE).asInt();}// 如果code = RESOURCE_NOT_FOUND 没有找到// 很可能之前注册的信息,已经被 Nacos 服务端移除了,所以返回这个错误信息 if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {// 重新拼装参数,发起服务注册Instance instance = new Instance();instance.setPort(beatInfo.getPort());instance.setIp(beatInfo.getIp());instance.setWeight(beatInfo.getWeight());instance.setMetadata(beatInfo.getMetadata());instance.setClusterName(beatInfo.getCluster());instance.setServiceName(beatInfo.getServiceName());instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(true);try {// 重新注册serverProxy.registerService(beatInfo.getServiceName(),NamingUtils.getGroupName(beatInfo.getServiceName()), instance);} catch (Exception ignore) {}}} catch (NacosException ex) {NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());}// 循环执行发送健康心跳检查executorService.schedule(new BeatReactor.BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);}
}
接下来我们看下具体发送实例心跳的代码,serverProxy.sendBeat() 方法。
public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());}// 拼装params参数Map<String, String> params = new HashMap<String, String>(8);Map<String, String> bodyMap = new HashMap<String, String>(2);if (!lightBeatEnabled) {bodyMap.put("beat", JacksonUtils.toJson(beatInfo));}params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());params.put("ip", beatInfo.getIp());params.put("port", String.valueOf(beatInfo.getPort()));// 通过Http请求发送实例心跳// 通过 /nacos/v1/ns/instance/beat 地址发送 put 请求String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);return JacksonUtils.toObj(result);
}
在官方文档上看下跟我们分析的代码是否一致,请求地址的、请求类型还有参数看样子都是能对的上的
官方地址:Open API 指南
本节小节:
- 主线任务:Nacos 客户端怎么发送服务心跳 ?
- 结果:Nacos客户端发起服务注册的时候,会执行一个 心跳健康检查的延时任务 ,这个任务每5秒会去发送一次心跳,告诉Nacos服务端我还活着,当前服务还是可用状态。
看完源码,把Nacos客户端流程图补充下:
总结:
本文主要围绕Nacos客户端发起自动注册的流程、自动注册的方式、发送服务心跳三个方面来分析源码的。