一、引言
本章来讲解Nacos注册表是如何进行写入数据的~
二、目录
目录
一、引言
二、目录
三、服务注册源码内容回顾
客户端源码回顾:
服务端源码回顾:
四、Nacos 注册表结构详解
五、写时复制概念
六、Nacos服务注册写入注册表源码解析
总结
三、服务注册源码内容回顾
客户端源码回顾:
在 Spring Boot 启动时,会扫描spring-cloud-starter-alibaba-nacos-discovery依赖下的 spring.factories 文件,从而创建里面的相关配置类。 在 spring.factories 文件中,有一个 NacosServiceRegistryAutoConfiguration类。这个配置类定义了三个bean对象。
- NacosServiceRegistry
- NacosRegistration
- NacosAutoServiceRegistration
在NacosAutoServiceRegistration的父类中,自定义了Spring监听器。当Spring 容器启动时,就会发布监听 WebServerInitializedEvent 事件,从而执行 NacosServiceRegistry 类中的 register 注册方法。在注册方法中,会通过HTTP方式调用Nacos服务端的实例注册接口,完成服务注册。
在发起服务实例注册之前,客户端会先去通过 BeatTask 任务,每5s向Nacos服务端发送一次健康心跳检查。
服务端源码回顾:
放任务:
从客户端发起的请求地址我们可以得知,服务端服务注册接口: /nacos/v1/ns/instance。
首先我们看出Nacos服务端也是一个Springboot项目,通过架构图,我们能找到 注册中心模块的代码在 naming 当中,最后确定了 /nacos/v1/ns/instance 路径是在InstanceController 类中的 register 方法。
在 register 方法当中,Instance 对象包装成 Datum 对象,放入到 DataStore 类中的dataMap里,最后包装成Pair对象,放入到阻塞队列当中。
取任务:
在Nacos后台会有一个 Notifier 异步任务,在 DistroConsistencyServiceImpl 类中会有一个被 @PostConstruct 修饰的init方法,该方法会把 Notifier 提交到单线程的线程池当中。在 Notifier类中的run方法,会不断从 tasks 队列中获取任务,紧接着调用 handle 方法。在 handle方法当中,会通过key把 Instance 从 DataStore类中的dataMap 获取出来,然后调用 listener.onChange 方法把数据写入到注册表当中。
最后讲到 listener.onChange 方法,后面就没再讲了,在讲之前先要说明两个重点:
- Nacos 内存注册表结构是什么样子的?
- 写时复制概念是什么?
讲完这两个知识点,我们再来分析本章源码就会很容易很多。
四、Nacos 注册表结构详解
Nacos 注册表是什么?注册表就是用来存放我们微服务实例注册信息的地方。
在ServiceManager类中有一个serviceMap属性,它就是对应我们 Nacos 的内存注册表。
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
我怎么知道这个就是注册表的呢?
这个是我在看查询实例列表源码的时候,最后返回的实例列表就是从这个 serviceMap 里面取的。客户端在调用其他微服务的时候,会先调用 Nacos 查询实例列表接口,查询当前可用服务,从而发起微服务调用,这一块后面章节再详细讲解。
本小节我们就先来重点分析注册表结构 ~
从源码来看,注册表结构是由两层Map结构组合的,那我们先来讲解下最外层的Map结构。
最外层的 key 对应的就是命名空间,如果不创建默认就是 public。命名空间下还包含了不同的分组,分别是 DEFAULT_GROUP、DEFAULT_GROUP_2。
然后再下一层的key对应的就是 分组名 +服务名,对应的value就是Service对象。
大概的对应关系如下图:
最外面两层分析完了,我们再来看看 Service 里面有什么。在Service当中有个 clusterMap属性(如下图),这个属性 key 是集群名称,value 是用来存放实例对象的。这个就是来支持不同环境集群实例存放的地方。
private Map<String, Cluster> clusterMap = new HashMap<>();
那么什么情况下需要使用到区分集群实例?
当项目访问量足够大的时候,为了响应速度更快,我们就要把服务部署在不同的地区服务器上。比如:北京有北京的集群实例、上海有上海的集群实例
我们可以在客户端配置集群的名字:
spring:cloud:nacos:discovery:# 注册地址server-addr: http://127.0.0.1:8848# 配置分组名字group: DEFAULT_GROUP_3# 配置集群名字cluster-name: BJ
我们可以在Nacos后台上看到该服务在不同集群下对应的实例。
那么这个实例列表信息是怎么存储的呢 ?
在 Cluster 类当中有两个Set 类型的属性进行存储。
在这两个集合当中,储存的就是 Instance 实例,Instance 实例包含了ip、port等信息。
/*** 持久化实例列表*/
@JsonIgnore
private Set<Instance> persistentInstances = new HashSet<>();/*** 临时实例列表*/
@JsonIgnore
private Set<Instance> ephemeralInstances = new HashSet<>();
ok,看到这里Nacos结构我们也大致了解了,我们看下图的总结:
其实我们能看到最核心的 Instance 是放在 Cluster 里面的。
五、写时复制概念
为什么要讲写时复制 ?
这是因为Nacos在注册实例写入注册表的时候用的就是写时复制,可以很好地避免并发冲突。
那什么是写时复制 ?
写时复制:Copy On Write 在数据写入到某个存储位置时,首先将原有内容拷贝出来,写到另一处地方,再将原来的引用地址修改成新对象地址。
那为什么会用到写时复制 ?
这里因为serviceMap 注册表,在 Cluster 对象中,最后使用 HashSet 来存储 Instance 对象的,它其实是一个共享的数据。所以在高并发下的场景,就可能会发生读写冲突。
为了让不懂读写冲突的小伙能明白,下面我们写个代码演示一下:
/*** @Author WangYan* @Date 2024/5/8 17:13* @Version 1.0*/
public class Test02 {public static void main(String[] args) {// 假设这个是存放实例信息Set<Object> objectSet = new HashSet<>();// 模拟异步任务,写入数据new Thread(new Runnable() {@Overridepublic void run() {try {// 先睡眠一下,不然还没开始读,就已经写完了Thread.sleep(100L);} catch (InterruptedException e) {e.printStackTrace();}// 写入10w条数据for (int i = 0; i < 100000; i++) {objectSet.add(i);}}}).start();// 死循环一直读取数据,模拟高并发场景for (; ; ) {for (Object o : objectSet) {System.out.println(o);}}}
}
控制台输出:
在多条线程对一个属性进行同时读写操作的时候,就会抛出 java.util.ConcurrentModificationException 异常。
这个时候我们就可以用到 写时复制,把原先的数据先备份一份,然后对备份的数据进行修改。这个时候是不会对原来有影响的,等数据操作完成后,再把原来对象的引用地址指向复制对象的引用地址,就完成了替换效果。
六、Nacos服务注册写入注册表源码解析
主线任务:在注册异步任务中,Nacos 是怎么把新的实例信息,写入到注册表中的 ?
还是回到我们上个章节最后的代码,这个 dataStore.get(datumKey).value 就是从 dataStore 里的Map中,把 Instances 实例列表获取出来。
listener.onChange(datumKey, dataStore.get(datumKey).value);
我们接着往下看 listener.onChange() 方法,可以看到这里又是多个实现类。前面章节说过,要么看对象注入调用方式确定实现类,或者直接 Debug,看一下步走到
哪个实现了类了。这里我们直接Debug,直接看 Service 当中的 onChange方法。
我们来看下 onChange 的代码实现,先看入参
- Key:这个 Key 的创建我们之前有分析过,KeyBuilder.buildInstanceListKey 代码创建出来的。
- Instances:它里面有个 InstanceList 属性,会存放多个 Instance 对象。
@Override
public void onChange(String key, Instances value) throws Exception {Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);// 对每一个Instance当中的权重进行判断赋值,大于 10000.0D 的给 10000.0D,大于 0.0D 并且小于 0.01D 的给 0.01Dfor (Instance instance : value.getInstanceList()) {if (instance == null) {// Reject this abnormal instance list:throw new RuntimeException("got null instance " + key);}if (instance.getWeight() > 10000.0D) {instance.setWeight(10000.0D);}if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {instance.setWeight(0.01D);}}// 主线任务:通过写时复制把新的实例信息写入注册表当中updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));recalculateChecksum();
}
这里我们来回顾下分支代码, Instances 是怎么被创建出来的。这个是在服务端接受注册的时候,在 addInstance 方法当中创建的。
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)throws NacosException {// 创建KeyString key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);Service service = getService(namespaceId, serviceName);// 锁住一个servicesynchronized (service) {// 这里提前说一下,ips 上层方法传过来的,是本次实例注册对应的Instance,也就是已开始从Request里面获取的参数信息。// 最后会放在instanList里面,为什么这里是List,说明它不仅仅只有一个,还会包含之前已经注册的Instance,放在了一个List里面List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);// 创建一个 Instances 对象,并且把 instanceList 属性set进去Instances instances = new Instances();instances.setInstanceList(instanceList);consistencyService.put(key, instances);}
}
我们接着往 addIpAddresses 方法里面看,通过 updateIpAddresses 方法进行重载调用。这个 updateIpAddresses 方法我就不讲那么细了,只要知道,第一次创建最终 instanceMap 只会返回新增的 instance 实例,后面进来不仅返回新增的 instance 实例还返回之前的 Instance 实例信息。
private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {// 调用 updateIpAddresses 方法,这里 action 传的是 addreturn updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
}public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)throws NacosException {// 这里 datum 是从 dataStore 当中通过key获取的Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));List<Instance> currentIPs = service.allIPs(ephemeral);Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());Set<String> currentInstanceIds = Sets.newHashSet();for (Instance instance : currentIPs) {currentInstances.put(instance.toIpAddr(), instance);currentInstanceIds.add(instance.getInstanceId());}Map<String, Instance> instanceMap;if (datum != null && null != datum.value) {instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);} else {instanceMap = new HashMap<>(ips.length);}for (Instance instance : ips) {if (!service.getClusterMap().containsKey(instance.getClusterName())) {Cluster cluster = new Cluster(instance.getClusterName(), service);cluster.init();service.getClusterMap().put(instance.getClusterName(), cluster);Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",instance.getClusterName(), instance.toJson());}if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {instanceMap.remove(instance.getDatumKey());} else {Instance oldInstance = instanceMap.get(instance.getDatumKey());if (oldInstance != null) {instance.setInstanceId(oldInstance.getInstanceId());} else {instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));}// 重点// instanceMap 会有两种情况// 第一种情况:第一次创建 instanceMap 对应一个空 Map,然后把新增加的 isntance 实例放进去// 第二种情况:不是第一次创建,instanceMap 会包含之前所创建的 Instance 对象instanceMap.put(instance.getDatumKey(), instance);}}// 最后 instanceMap 里面肯定会包含 新注册的 Instance 实例// 并且如果不是第一次注册,里面会包含了 之前 Instance 实例信息return new ArrayList<>(instanceMap.values());
}
接着我们重点分析 onChange 方法中最后调用了 updateIPs方法,在这个方法中会把新注册 Instance 写入到注册表当中
// 这里 instances 里面就包含了新实例对象
// ephemeral 为 ture,临时实例
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
// clusterMap 对应集群的Map
Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
// 把集群名字都放入到ipMap里面,value是一个空的ArrayList
for (String clusterName : clusterMap.keySet()) {ipMap.put(clusterName, new ArrayList<>());
}// 遍历全部的Instance,这里之前讲过,这个List<Instance> 包含了之前已经注册过的实例,和新注册的实例对象
// 这里的主要作用就是把相同集群下的 instance 进行分类
for (Instance instance : instances) {try {if (instance == null) {Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");continue;}// 判断客户端传过来的是 Instance 中,是否有设置 ClusterNameif (StringUtils.isEmpty(instance.getClusterName())) {// 如果没有,就给ClusterName赋值为 DEFAULTinstance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);}// 判断之前是否存在对应的 ClusterName,如果没有则需要创建新的 Cluster 对象if (!clusterMap.containsKey(instance.getClusterName())) {Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",instance.getClusterName(), instance.toJson());// 创建新的集群对象Cluster cluster = new Cluster(instance.getClusterName(), this);cluster.init();// 放入到集群 clusterMap 当中getClusterMap().put(instance.getClusterName(), cluster);}// 通过集群名字,从 ipMap 里面取List<Instance> clusterIPs = ipMap.get(instance.getClusterName());// 只有是新创建集群名字,这里才会为空,之前老的集群名字,在方法一开始里面都 value 赋值了 new ArrayList对象if (clusterIPs == null) {clusterIPs = new LinkedList<>();ipMap.put(instance.getClusterName(), clusterIPs);}// 把对应集群下的instance,添加进去clusterIPs.add(instance);} catch (Exception e) {Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);}
}// 分好类之后,针对每一个 ClusterName ,写入到注册表中
for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {// entryIPs 已经是根据ClusterName分好组的实例列表List<Instance> entryIPs = entry.getValue();// 根据写时复制,对每一个 Cluster 对象修改注册表 *** 重点// updateIps 则是 写时复制 的体现clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
}setLastModifiedMillis(System.currentTimeMillis());
getPushService().serviceChanged(this);
StringBuilder stringBuilder = new StringBuilder();for (Instance instance : allIPs()) {stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
}Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),stringBuilder.toString());
}
上面这一部分代码,主要就是对传入进来的 Instance 进行归类,最后把分好类的 Instance 对象,根据 Cluster 分类,对每一个 Cluster 中的实例列表进行修改。在这句代码中clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);就会体现写时复制,我们一起来看一下。
/*** Update instance list.** @param ips instance list* @param ephemeral whether these instances are ephemeral*/
public void updateIps(List<Instance> ips, boolean ephemeral) {// 先判断是否是临时实例// ephemeralInstances 临时实例// persistentInstances 持久化实例// 把对应数据先拿出来,放入到 新创建的 toUpdateInstances 集合中Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;// 先把老的实例列表复制一份 , 先复制一份新的HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());for (Instance ip : toUpdateInstances) {oldIpMap.put(ip.getDatumKey(), ip);}// 中间不重要的代码略,主要是对 oldIpMap 一些操作,其中也包括了集群节点同步,这里小册第二部分会详细讲解// 最后把传入进来的实例列表,重新初始化一个 HaseSet,赋值给toUpdateInstancestoUpdateInstances = new HashSet<>(ips);// 判断是否是临时实例if (ephemeral) {// 直接把之前的实例列表替换成新的ephemeralInstances = toUpdateInstances;} else {persistentInstances = toUpdateInstances;}
}
从这一部分源码中就能看出,全程没有对之前注册表的中的数据进行操作,而是先拿出来,备份一份,进行操作,最后进行替换。这样就完成了注册表的修改。
总结
1、本章主要讲了 Nacos注册表的结构,可以看出Nacos注册表的设计方式还是很灵活的,可以通过命名空间、分组、集群来进行实例的区分,具体可以根据公司的业务场景来定。
2、我们还讲了在高并发下读写冲突的问题,同时也讲了 解决方案“读写复制” 的理论。最后分析Nacos 实例异步注册任务中,是如何利用 “写时复制”来完成注册表修改的。
那么到本章为止,我们 Nacos 源码分析完了一条路线:
从客户端发起服务注册,到服务端响应实例注册请求。异步任务 + 内存队列怎么来处理的整个流程,就讲完了。
最后,别忘了把源码分析图补充完整: