【Nacos源码分析01-服务注册与集群间数据是同步】

文章目录

  • 了解CAP
  • BASE理论
  • Nacos支持CP还是AP
  • 集群数据同步
  • 实现集群数据一致性源码

了解CAP

在这里插入图片描述
CAP理论的核心观点是,一个分布式系统无法同时完全满足一致性、可用性和分区容错性这三个特性。具体而言,当发生网络分区时,系统必须在一致性和可用性之间做出选择:
CA(一致性和可用性):
系统在正常情况下能够保证一致性和可用性,但无法应对网络分区故障。一旦发生网络分区,系统必须放弃一致性或可用性中的一个。
CP(一致性和分区容错性):
系统在网络分区情况下能够保证一致性和分区容错性,但可能会牺牲部分可用性,即某些请求可能不会得到响应。
AP(可用性和分区容错性):
系统在网络分区情况下能够保证可用性和分区容错性,但可能会牺牲一致性,即不同节点可能会看到不一致的数据。
所谓的CAP定理,就是指在一个分布式系统中,CAP这三个指标,最多同时只能满足其中的两个,不可能三个都同时满足
实际应用中的选择
在实际应用中,不同的系统设计会根据具体需求在CAP三者之间进行权衡。
例如:
金融系统:通常更重视一致性(CP),确保数据的一致性和准确性,即使这可能会牺牲部分可用性。
社交媒体平台:通常更重视可用性和分区容错性(AP),确保用户始终可以访问和发布内容,即使这可能会导致短时间内的数据不一致。

BASE理论

BASE理论主要是包括以下三点:
基本可用(Basically Available):系统出现故障还是能够对外提供服务,不至于直接无法用了
软状态(Soft State):允许各个节点的数据不一致
最终一致性,(Eventually Consistent):虽然允许各个节点的数据不一致,但是在一定时间之后,各个节点的数据最终需要一致的
BASE理论其实就是妥协之后的产物。

Nacos支持CP还是AP

Nacos其实目前是同时支持AP和CP的
具体使用AP还是CP得取决于Nacos内部的具体功能,并不是有的文章说的可以通过一个配置自由切换。
就以服务注册举例来说,对于临时实例来说,Nacos会优先保证可用性,也就是AP
对于永久实例,Nacos会优先保证数据的一致性,也就是CP

集群数据同步

在Nacos以集群模式运行时,当Nacos服务器收到服务注册请求后,发生了ClientEvent.ClientChangedEvent事件,就会触发将注册的服务信息同步给集群中的其他Nacos-server节点。
具体的代码分析:
1.先开始从服务注册接口开始:
地址:com.alibaba.nacos.naming.controllers.v2.InstanceControllerV2.java
服务注册

@CanDistro@PostMapping@Secured(action = ActionTypes.WRITE)public Result<String> register(InstanceForm instanceForm) throws NacosException {// check paraminstanceForm.validate();checkWeight(instanceForm.getWeight());// build instanceInstance instance = buildInstance(instanceForm);instanceServiceV2.registerInstance(instanceForm.getNamespaceId(), buildCompositeServiceName(instanceForm), instance);NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(), "",false, instanceForm.getNamespaceId(), instanceForm.getGroupName(), instanceForm.getServiceName(),instance.getIp(), instance.getPort()));return Result.success("ok");}

2.添加实例

 @Overridepublic void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {//参数校验NamingUtils.checkInstanceIsLegal(instance);//判断是临时节点还是永久节点boolean ephemeral = instance.isEphemeral();String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);createIpPortClientIfAbsent(clientId);Service service = getService(namespaceId, serviceName, ephemeral);clientOperationService.registerInstance(service, instance, clientId);}
 @Overridepublic void registerInstance(Service service, Instance instance, String clientId) throws NacosException {NamingUtils.checkInstanceIsLegal(instance);Service singleton = ServiceManager.getInstance().getSingleton(service);if (!singleton.isEphemeral()) {throw new NacosRuntimeException(NacosException.INVALID_PARAM,String.format("Current service %s is persistent service, can't register ephemeral instance.",singleton.getGroupedServiceName()));}Client client = clientManager.getClient(clientId);if (!clientIsLegal(client, clientId)) {return;}InstancePublishInfo instanceInfo = getPublishInfo(instance);client.addServiceInstance(singleton, instanceInfo);client.setLastUpdatedTime();client.recalculateRevision();NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));}

3.添加并且同步

/*** Request publisher publish event Publishers load lazily, calling publisher.** @param eventType class Instances type of the event type.* @param event     event instance.*/private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {return INSTANCE.sharePublisher.publish(event);}final String topic = ClassUtils.getCanonicalName(eventType);EventPublisher publisher = INSTANCE.publisherMap.get(topic);if (publisher != null) {return publisher.publish(event);}if (event.isPluginEvent()) {return true;}LOGGER.warn("There are no [{}] publishers for this event, please register", topic);return false;}
  public boolean publish(Event event) {checkIsStart();boolean success = this.queue.offer(event);if (!success) {LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);receiveEvent(event);return true;}return true;}
void receiveEvent(Event event) {final long currentEventSequence = event.sequence();if (!hasSubscriber()) {LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.", event);return;}// Notification single event listenerfor (Subscriber subscriber : subscribers) {if (!subscriber.scopeMatches(event)) {continue;}// Whether to ignore expiration eventsif (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",event.getClass());continue;}// Because unifying smartSubscriber and subscriber, so here need to think of compatibility.// Remove original judge part of codes.notifySubscriber(subscriber, event);}}

通知集群节点

 public void notifySubscriber(final Subscriber subscriber, final Event event) {LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);final Runnable job = () -> subscriber.onEvent(event);final Executor executor = subscriber.executor();if (executor != null) {executor.execute(job);} else {try {job.run();} catch (Throwable e) {LOGGER.error("Event callback exception: ", e);}}}

实现集群数据一致性源码

进入DistroClientDataProcessor的实现类来执行onEvent

  @Overridepublic void onEvent(Event event) {if (EnvUtil.getStandaloneMode()) {return;}if (event instanceof ClientEvent.ClientVerifyFailedEvent) {syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);} else {syncToAllServer((ClientEvent) event);}}

通过syncToAllServer()方法同步数据给集群中的其它节点:
临时实例,使用distro协议同步; 持久实例:使用raft协议同步

  private void syncToAllServer(ClientEvent event) {/*** 判断客户端是否为空,是否是临时实例,判断是否是负责节点* * 临时实例,使用distro协议同步; 持久实例:使用raft协议同步
**/Client client = event.getClient();// Only ephemeral data sync by Distro, persist client should sync by raft.if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {return;}if (event instanceof ClientEvent.ClientDisconnectEvent) {//客户端断开连接DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.DELETE);} else if (event instanceof ClientEvent.ClientChangedEvent) {//客户端变更 新增或者修改DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.CHANGE);}}

同步时,会涉及到一个负责节点和非负责节点。
负责节点(发起同步)
也就是收到客户端事件ClientChangedEvent后负责同步信息给其他非负责节点, 所以这里只能有负责节点来进行同步,非负责节点只能接收同步事件。
DistroProtocol
Distro是阿里巴巴的私有协议,distro协议是为了注册中心而创造出的协议。
DistroProtocol会循环所有其他nacos节点,提交一个异步任务,这个异步任务会延迟1s,其实这里我们就可以看到这里涉及到客户端的断开和客户端的新增和修改。
对于Delete操作,由DistroSyncDeleteTask处理,对于Change操作,由DistroSyncChangeTask处理,这里我们从DistroSyncChangeTask来看。

   public void sync(DistroKey distroKey, DataOperation action, long delay) {for (Member each : memberManager.allMembersWithoutSelf()) {syncToTarget(distroKey, action, each.getAddress(), delay);}}

核心逻辑在syncToTarget

public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),targetServer);DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);// 往延时任务引擎中加入DistroDelayTask任务,最终将会调用DistroDelayTaskProcessor.process方法// distroTaskEngineHolder.getDelayTaskExecuteEngine()返回的是DistroDelayTaskExecuteEngine,它继承自NacosDelayTaskExecuteEngine,// 其构造方法中初始化了一个定时任务ProcessRunnable,不断从阻塞队列中拿出任务出来执行(ConcurrentHashMap<Object, AbstractDelayTask> tasks)distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);}
}

在syncToTarget()方法中,构建了一个DistroDelayTask任务,然后放到了延时任务引擎中执行,熟悉Nacos服务注册流程的小伙伴对这一块应该不陌生,这里通过distroTaskEngineHolder.getDelayTaskExecuteEngine()返回了一个DistroDelayTaskExecuteEngine执行引擎,它继承自NacosDelayTaskExecuteEngine,而在NacosDelayTaskExecuteEngine的构造方法中初始化了一个定时任务ProcessRunnable,不断从阻塞队列中拿出任务出来执行(ConcurrentHashMap<Object, AbstractDelayTask> tasks)。

/*** 任务队列* key:对应的服务*/
protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;protected final ReentrantLock lock = new ReentrantLock();public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {super(logger);// 初始化任务队列tasks = new ConcurrentHashMap<>(initCapacity);// 创建定时任务的线程池processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));// 在指定的初始延迟时间(100毫秒)后开始执行任务,并按固定的时间间隔周期性(100毫秒)地执行任务。// 默认延时100毫秒执行ProcessRunnable,然后每隔100毫秒周期性执行ProcessRunnableprocessingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}public void addTask(Object key, AbstractDelayTask newTask) {// 加锁防并发处理,key就是对应的服务lock.lock();try {// ConcurrentHashMap<Object, AbstractDelayTask> tasks = new ConcurrentHashMap<>(initCapacity);// 通过key判断是否已存在map中AbstractDelayTask existTask = tasks.get(key);if (null != existTask) {// 服务存在的话,则需要合并任务,其实就是合并多个任务,一起执行newTask.merge(existTask);}// 将任务放入到map中,等待处理tasks.put(key, newTask);} finally {lock.unlock();}
}/*** 任务处理类*/
private class ProcessRunnable implements Runnable {@Overridepublic void run() {try {processTasks();} catch (Throwable e) {getEngineLog().error(e.toString(), e);}}
}protected void processTasks() {Collection<Object> keys = getAllTaskKeys();for (Object taskKey : keys) {// 从队列中移除这个任务AbstractDelayTask task = removeTask(taskKey);if (null == task) {continue;}// taskKey示例值: Service{namespace='public', group='DEFAULT_GROUP', name='discovery-provider', ephemeral=true, revision=0}// 找到处理类NacosTaskProcessor processor = getProcessor(taskKey);if (null == processor) {getEngineLog().error("processor not found for task, so discarded. " + task);continue;}try {// ReAdd task if process failedif (!processor.process(task)) {// 处理失败的话,重新入队(即重试)retryFailedTask(taskKey, task);}} catch (Throwable e) {getEngineLog().error("Nacos task execute error ", e);retryFailedTask(taskKey, task);}}
}

在processTasks()方法中,首先获取处理类,如果获取不到,则使用默认的处理类。在DistroTaskEngineHolder构造方法中,已经设置了默认处理类为DistroDelayTaskProcessor。

public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);// 设置默认的处理器delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
}
// DistroDelayTaskProcessor#process
public boolean process(NacosTask task) {if (!(task instanceof DistroDelayTask)) {return true;}DistroDelayTask distroDelayTask = (DistroDelayTask) task;DistroKey distroKey = distroDelayTask.getDistroKey();switch (distroDelayTask.getAction()) {// unregister注册的是DELETE事件case DELETE:// 添加了DistroSyncDeleteTask执行任务,由 DistroExecuteTaskExecuteEngine 执行DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);return true;case CHANGE:case ADD:DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);// 往立即执行的任务引擎中加入DistroSyncChangeTask任务,DistroSyncChangeTask实现了runnable接口,关注其run方法distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);return true;default:return false;}
}

这里我们以DistroSyncChangeTask为例,来分析整体的同步流程。
可以看到,这里还未开始同步数据的流程,而是又封装了一个DistroSyncChangeTask任务,加入到了Distro任务引擎中,实际上是加入到了TaskExecuteWorker类内部的阻塞任务队列中,如下所示:

public void addTask(Object tag, AbstractExecuteTask task) {// 获取处理类NacosTaskProcessor processor = getProcessor(tag);if (null != processor) {// 不为空,就用对应的processor处理processor.process(task);return;}// 没有找到处理类的话, 就用公共的TaskExecuteWorker执行TaskExecuteWorker worker = getWorker(tag);worker.process(task);
}/*** 阻塞队列, 类型为Runnable,说明存入的是一个线程*/
private final BlockingQueue<Runnable> queue;// TaskExecuteWorker#process
public boolean process(NacosTask task) {if (task instanceof AbstractExecuteTask) {// 添加任务到阻塞队列中putTask((Runnable) task);}return true;
}private void putTask(Runnable task) {try {queue.put(task);} catch (InterruptedException ire) {log.error(ire.toString(), ire);}
}

这里我们没有找到处理类的话,,就用公共的TaskExecuteWorker执行:

public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {this.name = name + "_" + mod + "%" + total;// 阻塞队列this.queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);this.closed = new AtomicBoolean(false);this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger;// 内部执行worker,实际上是一个线程realWorker = new InnerWorker(this.name);// 启动workerrealWorker.start();
}

在TaskExecuteWorker的构造方法中,初始化了一个内部执行worker,实际上是一个线程,它不断地从阻塞队列中拿出任务,来执行:

private class InnerWorker extends Thread {InnerWorker(String name) {setDaemon(false);setName(name);}@Overridepublic void run() {while (!closed.get()) {try {// 从阻塞队列获取任务,在process()方法中通过putTask()将任务存入到了阻塞队列中Runnable task = queue.take();long begin = System.currentTimeMillis();// 执行任务task.run();long duration = System.currentTimeMillis() - begin;if (duration > 1000L) {log.warn("task {} takes {}ms", task, duration);}} catch (Throwable e) {log.error("[TASK-FAILED] " + e, e);}}}
}

调用task.run(),我们之前加入的是DistroSyncChangeTask,它继承自AbstractDistroExecuteTask,间接实现了runnable接口,查看其run方法:

// AbstractDistroExecuteTask#run
public void run() {String type = getDistroKey().getResourceType();DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(type);if (null == transportAgent) {Loggers.DISTRO.warn("No found transport agent for type [{}]", type);return;}Loggers.DISTRO.info("[DISTRO-START] {}", toString());if (transportAgent.supportCallbackTransport()) {doExecuteWithCallback(new DistroExecuteCallback());} else {executeDistroTask();}
}private void executeDistroTask() {try {boolean result = doExecute();if (!result) {// 失败重试handleFailedTask();}Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);} catch (Exception e) {Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);handleFailedTask();}
}// 由子类DistroSyncChangeTask去实现
protected abstract boolean doExecute();

最终将会执行DistroSyncChangeTask的doExecute()方法:

protected boolean doExecute() {String type = getDistroKey().getResourceType();// 从DistroClientDataProcessor获取DistroData, 其实是从ClientManager实时获取ClientDistroData distroData = getDistroData(type);if (null == distroData) {Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());return true;}// 将得到的数据同步给其他服务节点return getDistroComponentHolder().findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
}

八、Nacos源码系列:Nacos集群数据同步

负责节点(发起同步)

DistroProtocol

获取同步数据getDistroData

执行同步数据syncData

非负责节点(接收请求)

在Nacos以集群模式运行时,当Nacos服务器收到服务注册请求后,发生了ClientEvent.ClientChangedEvent事件,就会触发将注册的服务信息同步给集群中的其他Nacos-server节点。

ClientEvent.ClientChangedEvent事件的真正处理类是在DistroClientDataProcessor#onEvent方法:

public void onEvent(Event event) {// 只有集群模式才有效,单机模式启动的Nacos,不会执行同步操作if (EnvUtil.getStandaloneMode()) {return;}if (event instanceof ClientEvent.ClientVerifyFailedEvent) {syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);} else {// 同步数据给其它节点syncToAllServer((ClientEvent) event);}
}

通过syncToAllServer()方法同步数据给集群中的其它节点:

private void syncToAllServer(ClientEvent event) {Client client = event.getClient();/*** 判断客户端是否为空,是否是临时实例,判断是否是负责节点* * 临时实例,使用distro协议同步; 持久实例:使用raft协议同步*/if (isInvalidClient(client)) {return;}if (event instanceof ClientEvent.ClientDisconnectEvent) {// 客户端断开连接事件DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.DELETE);} else if (event instanceof ClientEvent.ClientChangedEvent) {// 客户端变更事件(新增或修改)DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.CHANGE);}
}

同步时,会涉及到一个负责节点和非负责节点。

负责节点(发起同步)
也就是收到客户端事件ClientChangedEvent后负责同步信息给其他非负责节点, 所以这里只能有负责节点来进行同步,非负责节点只能接收同步事件。

private boolean isInvalidClient(Client client) {// 临时实例,使用distro协议同步; 持久实例:使用raft协议同步return null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client);
}

DistroProtocol
Distro是阿里巴巴的私有协议,distro协议是为了注册中心而创造出的协议。

DistroProtocol会循环所有其他nacos节点,提交一个异步任务,这个异步任务会延迟1s,其实这里我们就可以看到这里涉及到客户端的断开和客户端的新增和修改。

对于Delete操作,由DistroSyncDeleteTask处理,对于Change操作,由DistroSyncChangeTask处理,这里我们从DistroSyncChangeTask来看。

public void sync(DistroKey distroKey, DataOperation action) {// 配置同步延迟的时间:默认为1ssync(distroKey, action, DistroConfig.getInstance().getSyncDelayMillis());
}// com.alibaba.nacos.core.distributed.distro.DistroProtocol#sync()
public void sync(DistroKey distroKey, DataOperation action, long delay) {// 遍历每个除自己以外的其它成员for (Member each : memberManager.allMembersWithoutSelf()) {syncToTarget(distroKey, action, each.getAddress(), delay);}
}
核心逻辑在syncToTarget()方法:
```java
public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),targetServer);DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);// 往延时任务引擎中加入DistroDelayTask任务,最终将会调用DistroDelayTaskProcessor.process方法// distroTaskEngineHolder.getDelayTaskExecuteEngine()返回的是DistroDelayTaskExecuteEngine,它继承自NacosDelayTaskExecuteEngine,// 其构造方法中初始化了一个定时任务ProcessRunnable,不断从阻塞队列中拿出任务出来执行(ConcurrentHashMap<Object, AbstractDelayTask> tasks)distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);}
}

在syncToTarget()方法中,构建了一个DistroDelayTask任务,然后放到了延时任务引擎中执行,熟悉Nacos服务注册流程的小伙伴对这一块应该不陌生,这里通过distroTaskEngineHolder.getDelayTaskExecuteEngine()返回了一个DistroDelayTaskExecuteEngine执行引擎,它继承自NacosDelayTaskExecuteEngine,而在NacosDelayTaskExecuteEngine的构造方法中初始化了一个定时任务ProcessRunnable,不断从阻塞队列中拿出任务出来执行(ConcurrentHashMap<Object, AbstractDelayTask> tasks)。

/*** 任务队列* key:对应的服务*/
protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;protected final ReentrantLock lock = new ReentrantLock();public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {super(logger);// 初始化任务队列tasks = new ConcurrentHashMap<>(initCapacity);// 创建定时任务的线程池processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));// 在指定的初始延迟时间(100毫秒)后开始执行任务,并按固定的时间间隔周期性(100毫秒)地执行任务。// 默认延时100毫秒执行ProcessRunnable,然后每隔100毫秒周期性执行ProcessRunnableprocessingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}public void addTask(Object key, AbstractDelayTask newTask) {// 加锁防并发处理,key就是对应的服务lock.lock();try {// ConcurrentHashMap<Object, AbstractDelayTask> tasks = new ConcurrentHashMap<>(initCapacity);// 通过key判断是否已存在map中AbstractDelayTask existTask = tasks.get(key);if (null != existTask) {// 服务存在的话,则需要合并任务,其实就是合并多个任务,一起执行newTask.merge(existTask);}// 将任务放入到map中,等待处理tasks.put(key, newTask);} finally {lock.unlock();}
}/*** 任务处理类*/
private class ProcessRunnable implements Runnable {@Overridepublic void run() {try {processTasks();} catch (Throwable e) {getEngineLog().error(e.toString(), e);}}
}protected void processTasks() {Collection<Object> keys = getAllTaskKeys();for (Object taskKey : keys) {// 从队列中移除这个任务AbstractDelayTask task = removeTask(taskKey);if (null == task) {continue;}// taskKey示例值: Service{namespace='public', group='DEFAULT_GROUP', name='discovery-provider', ephemeral=true, revision=0}// 找到处理类NacosTaskProcessor processor = getProcessor(taskKey);if (null == processor) {getEngineLog().error("processor not found for task, so discarded. " + task);continue;}try {// ReAdd task if process failedif (!processor.process(task)) {// 处理失败的话,重新入队(即重试)retryFailedTask(taskKey, task);}} catch (Throwable e) {getEngineLog().error("Nacos task execute error ", e);retryFailedTask(taskKey, task);}}
}

在processTasks()方法中,首先获取处理类,如果获取不到,则使用默认的处理类。在DistroTaskEngineHolder构造方法中,已经设置了默认处理类为DistroDelayTaskProcessor。

public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {
DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);
// 设置默认的处理器
delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
}
也就是说,在调用syncToTarget()方法后,会触发任务DistroDelayTaskProcessor处理任务。 对于删除类型的任务,触发任务DistroSyncDeleteTask , 对于新增、修改类型的任务:
DistroSyncChangeTask。

// DistroDelayTaskProcessor#process
public boolean process(NacosTask task) {if (!(task instanceof DistroDelayTask)) {return true;}DistroDelayTask distroDelayTask = (DistroDelayTask) task;DistroKey distroKey = distroDelayTask.getDistroKey();switch (distroDelayTask.getAction()) {// unregister注册的是DELETE事件case DELETE:// 添加了DistroSyncDeleteTask执行任务,由 DistroExecuteTaskExecuteEngine 执行DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);return true;case CHANGE:case ADD:DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);// 往立即执行的任务引擎中加入DistroSyncChangeTask任务,DistroSyncChangeTask实现了runnable接口,关注其run方法distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);return true;default:return false;}
}

这里我们以DistroSyncChangeTask为例,来分析整体的同步流程。

可以看到,这里还未开始同步数据的流程,而是又封装了一个DistroSyncChangeTask任务,加入到了Distro任务引擎中,实际上是加入到了TaskExecuteWorker类内部的阻塞任务队列中,如下所示:

public void addTask(Object tag, AbstractExecuteTask task) {// 获取处理类NacosTaskProcessor processor = getProcessor(tag);if (null != processor) {// 不为空,就用对应的processor处理processor.process(task);return;}// 没有找到处理类的话, 就用公共的TaskExecuteWorker执行TaskExecuteWorker worker = getWorker(tag);worker.process(task);
}/*** 阻塞队列, 类型为Runnable,说明存入的是一个线程*/
private final BlockingQueue<Runnable> queue;// TaskExecuteWorker#process
public boolean process(NacosTask task) {if (task instanceof AbstractExecuteTask) {// 添加任务到阻塞队列中putTask((Runnable) task);}return true;
}private void putTask(Runnable task) {try {queue.put(task);} catch (InterruptedException ire) {log.error(ire.toString(), ire);}
}

这里我们没有找到处理类的话,,就用公共的TaskExecuteWorker执行:

public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {this.name = name + "_" + mod + "%" + total;// 阻塞队列this.queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);this.closed = new AtomicBoolean(false);this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger;// 内部执行worker,实际上是一个线程realWorker = new InnerWorker(this.name);// 启动workerrealWorker.start();
}TaskExecuteWorker的构造方法中,初始化了一个内部执行worker,实际上是一个线程,它不断地从阻塞队列中拿出任务,来执行:private class InnerWorker extends Thread {InnerWorker(String name) {setDaemon(false);setName(name);}@Overridepublic void run() {while (!closed.get()) {try {// 从阻塞队列获取任务,在process()方法中通过putTask()将任务存入到了阻塞队列中Runnable task = queue.take();long begin = System.currentTimeMillis();// 执行任务task.run();long duration = System.currentTimeMillis() - begin;if (duration > 1000L) {log.warn("task {} takes {}ms", task, duration);}} catch (Throwable e) {log.error("[TASK-FAILED] " + e, e);}}}
}

调用task.run(),我们之前加入的是DistroSyncChangeTask,它继承自AbstractDistroExecuteTask,间接实现了runnable接口,查看其run方法:

// AbstractDistroExecuteTask#run
public void run() {String type = getDistroKey().getResourceType();DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(type);if (null == transportAgent) {Loggers.DISTRO.warn("No found transport agent for type [{}]", type);return;}Loggers.DISTRO.info("[DISTRO-START] {}", toString());if (transportAgent.supportCallbackTransport()) {doExecuteWithCallback(new DistroExecuteCallback());} else {executeDistroTask();}
}private void executeDistroTask() {try {boolean result = doExecute();if (!result) {// 失败重试handleFailedTask();}Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);} catch (Exception e) {Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);handleFailedTask();}
}

// 由子类DistroSyncChangeTask去实现
protected abstract boolean doExecute();

最终将会执行DistroSyncChangeTask的doExecute()方法:

protected boolean doExecute() {String type = getDistroKey().getResourceType();// 从DistroClientDataProcessor获取DistroData, 其实是从ClientManager实时获取ClientDistroData distroData = getDistroData(type);if (null == distroData) {Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());return true;}// 将得到的数据同步给其他服务节点return getDistroComponentHolder().findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
}

获取同步数据getDistroData
这里获取同步数据其实是从DistroClientDataProcessor获取DistroData, 其实是从ClientManager实时获取Client。

private DistroData getDistroData(String type) {// 其实是从ClientManager实时获取ClientDistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());if (null != result) {result.setType(OPERATION);}return result;
}// DistroClientDataProcessor#getDistroData
public DistroData getDistroData(DistroKey distroKey) {Client client = clientManager.getClient(distroKey.getResourceKey());if (null == client) {return null;}// 把生成的同步数据放入到数组中byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());return new DistroData(distroKey, data);
}

可以看到generateSyncData 方法是关键获取服务的方法,该方法提供了同步数据,包含Client的注册信息,包括客户端注册了哪些namespace,哪些group,哪些service,哪些instance。

public ClientSyncData generateSyncData() {List<String> namespaces = new LinkedList<>();List<String> groupNames = new LinkedList<>();List<String> serviceNames = new LinkedList<>();List<String> batchNamespaces = new LinkedList<>();List<String> batchGroupNames = new LinkedList<>();List<String> batchServiceNames = new LinkedList<>();List<InstancePublishInfo> instances = new LinkedList<>();List<BatchInstancePublishInfo> batchInstancePublishInfos = new LinkedList<>();BatchInstanceData  batchInstanceData = new BatchInstanceData();for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {InstancePublishInfo instancePublishInfo = entry.getValue();if (instancePublishInfo instanceof BatchInstancePublishInfo) {BatchInstancePublishInfo batchInstance = (BatchInstancePublishInfo) instancePublishInfo;batchInstancePublishInfos.add(batchInstance);buildBatchInstanceData(batchInstanceData, batchNamespaces, batchGroupNames, batchServiceNames, entry);batchInstanceData.setBatchInstancePublishInfos(batchInstancePublishInfos);} else {namespaces.add(entry.getKey().getNamespace());groupNames.add(entry.getKey().getGroup());serviceNames.add(entry.getKey().getName());instances.add(entry.getValue());}}ClientSyncData data = new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances, batchInstanceData);data.getAttributes().addClientAttribute(REVISION, getRevision());return data;
}

执行同步数据syncData
这里的同步实际是由DistroClientTransportAgent来负责的,将数据封装成DistroDataRequest,然后获取目标节点Member,然后调用sendRequest异步方法执行同步:

public boolean syncData(DistroData data, String targetServer) {if (isNoExistTarget(targetServer)) {return true;}DistroDataRequest request = new DistroDataRequest(data, data.getType());// 获取目标节点Member member = memberManager.find(targetServer);if (checkTargetServerStatusUnhealthy(member)) {Loggers.DISTRO.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy, key: {}", targetServer,data.getDistroKey());return false;}try {// 同步发送 DistroDataRequest 请求// 真正处理请求是在:com.alibaba.nacos.naming.remote.rpc.handler.DistroDataRequestHandler.handle方法Response response = clusterRpcClientProxy.sendRequest(member, request);return checkResponse(response);} catch (NacosException e) {Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! key: {}", data.getDistroKey(), e);}return false;
}

这时我们主要关注非负责节点收到同步请求后如何处理。
非负责节点(接收请求)
当负责节点将数据发送给非负责节点以后,将要处理发送过来的Client数据。

// DistroDataRequestHandler#handle
public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {try {switch (request.getDataOperation()) {case VERIFY:return handleVerify(request.getDistroData(), meta);case SNAPSHOT:return handleSnapshot();case ADD:case CHANGE:case DELETE:// 变更操作: 维护注册表数据,然后发布事件return handleSyncData(request.getDistroData());case QUERY:return handleQueryData(request.getDistroData());default:return new DistroDataResponse();}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);DistroDataResponse result = new DistroDataResponse();result.setErrorCode(ResponseCode.FAIL.getCode());result.setMessage("handle distro request with exception");return result;}
}private DistroDataResponse handleSyncData(DistroData distroData) {DistroDataResponse result = new DistroDataResponse();// 调用DistroProtocol.onReceive方法if (!distroProtocol.onReceive(distroData)) {result.setErrorCode(ResponseCode.FAIL.getCode());result.setMessage("[DISTRO-FAILED] distro data handle failed");}return result;
}// DistroProtocol#onReceive
public boolean onReceive(DistroData distroData) {Loggers.DISTRO.info("[DISTRO] Receive distro data type: {}, key: {}", distroData.getType(),distroData.getDistroKey());String resourceType = distroData.getDistroKey().getResourceType();DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);if (null == dataProcessor) {Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);return false;}// 通过处理器处理接收到的数据return dataProcessor.processData(distroData);
}

这里我主要关注ADD/CHANGE,所以主要关注handleSyncData()方法。

public boolean processData(DistroData distroData) {switch (distroData.getType()) {case ADD:case CHANGE:// 反序列化同步数据为ClientSyncDataClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), ClientSyncData.class);// 处理同步数据handlerClientSyncData(clientSyncData);return true;case DELETE:String deleteClientId = distroData.getDistroKey().getResourceKey();Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);clientManager.clientDisconnected(deleteClientId);return true;default:return false;}
}

首先反序列化接收到的同步数据,封装成ClientSyncData,然后处理同步数据:

private void handlerClientSyncData(ClientSyncData clientSyncData) {Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}, revision={}", clientSyncData.getClientId(),clientSyncData.getAttributes().getClientAttribute(ClientConstants.REVISION, 0L));// 同步客户端连接,生成client:不存在时创建client(IpPortBasedClient)clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());// 获取ClientClient client = clientManager.getClient(clientSyncData.getClientId());// 更新Client数据upgradeClient(client, clientSyncData);
}

同步客户端连接,然后获取到客户端对象,更新客户端的注册表信息,并发布一些事件:

private void upgradeClient(Client client, ClientSyncData clientSyncData) {Set<Service> syncedService = new HashSet<>();// process batch instance sync logicprocessBatchInstanceDistroData(syncedService, client, clientSyncData);List<String> namespaces = clientSyncData.getNamespaces();List<String> groupNames = clientSyncData.getGroupNames();List<String> serviceNames = clientSyncData.getServiceNames();List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();for (int i = 0; i < namespaces.size(); i++) {Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));Service singleton = ServiceManager.getInstance().getSingleton(service);syncedService.add(singleton);InstancePublishInfo instancePublishInfo = instances.get(i);if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {// 添加注册表信息,并发布ClientRegisterServiceEventclient.addServiceInstance(singleton, instancePublishInfo);NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));}}for (Service each : client.getAllPublishedService()) {if (!syncedService.contains(each)) {// 删除注册表信息,并发布ClientDeregisterServiceEventclient.removeServiceInstance(each);NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));}}client.setRevision(clientSyncData.getAttributes().<Integer>getClientAttribute(ClientConstants.REVISION, 0));
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/19990.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机视觉与模式识别实验1-2 图像的形态学操作

文章目录 &#x1f9e1;&#x1f9e1;实验流程&#x1f9e1;&#x1f9e1;1.图像膨胀2.图像腐蚀3.膨胀与腐蚀的综合使用4.对下面二值图像的目标提取骨架&#xff0c;并分析骨架结构。 &#x1f9e1;&#x1f9e1;全部代码&#x1f9e1;&#x1f9e1; &#x1f9e1;&#x1f9e1…

tomcat中管理环境变量之setenv.sh

tomcat配置spring profiles springboot打包war部署到外部tomcat的时候指定profile启动 windows 在%tomcat%/bin下创建setenv.bat文件 linux 在%tomcat%/bin下创建setenv.sh文件 使用JVM参数: windows set "JAVA_OPTS%JAVA_OPTS% -Dspring.profiles.activedev"in…

Python自动化办公2.0 即将发布

第一节课&#xff1a;数据整理与清洗 第二节课&#xff1a;数据筛选、过滤与排序 第三节课&#xff1a;高级数据处理技巧 第四节课&#xff1a;数据可视化与实践案例 第五节课&#xff1a;统计分析与报表 第六节&#xff1a;常见的Excel报表 与下方的课程形成知识体系&…

vue3学习(六)

前言 接上一篇学习笔记&#xff0c;今天主要是抽空学习了vue的状态管理&#xff0c;这里学习的是vuex&#xff0c;版本4.1。学习还没有学习完&#xff0c;里面有大坑&#xff0c;难怪现在官网出的状态管理用Pinia。 一、vuex状态管理知识点 上面的方式没有写全&#xff0c;还有…

如何在Windows 10上更改默认系统字体,这里有详细步骤

Windows 10的默认系统字体Segoe UI看起来相当不错。但是,如果你有更好的替代品,你可以更改Windows 10 PC上的默认系统字体。我们将向你展示如何执行此操作。 如何使用注册表编辑器更改默认系统字体 在撰写本文时,“设置”和“控制面板”都没有更改默认系统字体的选项。这意…

官网:管它日薄西山or蒸蒸日上,气质这块,必须拿捏死死的。

在日薄西山的时候&#xff0c;网站建设面临着许多困难和挑战。市场竞争激烈&#xff0c;用户需求多样化&#xff0c;技术更新迅速&#xff0c;这些都要求我们在网站建设中拥有高尚的气质。 而在蒸蒸日上的时刻&#xff0c;网站建设同样需要我们拿捏好气质。只有坚持下去&#…

软件和系统集成项目确认测试报告的费用需要多少?

确认测试报告 软件和系统集成项目确认测试报告的费用因多种因素而异&#xff0c;包括项目的规模、复杂度、测试范围、测试周期等。第三方软件测试机构价格区间一般是几千到几万不等&#xff0c;还有些会根据建设费用的2-5%进行收费。 一般来说&#xff0c;软件和系统集成项目…

element ui el-calendar日历组件完整代码

el-calendar日历组件完整代码 1. 说在前面2. 日历整体代码3. 编辑与新增 1. 说在前面 最近一直忙于上班&#xff0c;没咋看博客&#xff0c;发现很多小伙伴都要日历组件的代码&#xff0c;于是今天抽空给大家整理一下&#xff0c;为爱发电&#xff01;日历组件的原文在这里&am…

计算机视觉与模式识别实验1-1 图像的直方图平衡

文章目录 &#x1f9e1;&#x1f9e1;实验流程&#x1f9e1;&#x1f9e1;1.读入图像‘rice.png’&#xff0c;在一个窗口中显示灰度级n64&#xff0c;128和256的图像直方图。2.调解图像灰度范围&#xff0c;观察变换后的图像及其直方图的变化。3.分别对图像‘pout.tif’和‘ti…

③单细胞学习-pbmc的Seurat 流程

目录 1&#xff0c;数据读取 2&#xff0c;线粒体基因查看 3&#xff0c;数据标准化 4&#xff0c;识别高变基因 5&#xff0c;进行数据归一化 6&#xff0c;进行线性降维 7&#xff0c;确定细胞簇 8&#xff0c;UMAP/tSNE降维&#xff08;保存pbmc_tutorial.rds&#…

mirth Connect 自定义JAVA_HOME

mirth Connect 自定义JAVA_HOME 1、背景 服务器上安装了两个不同版本的Java&#xff0c;我希望Mirth服务使用与默认系统不同的版本。自定义指定java版本 2、解决方法 2.1 优先级说明 系统变量JAVA_HOME (设置后&#xff0c;mirth会根据这个进行启动运行服务&#xff0c;优先级…

【火炬打宝策略】

打宝策略刷遗物&#xff1a; 时可4 只刷奇诊加稀有度&#xff0c;没有奇诊可以直接不打。

模型 STORY评估框架

说明&#xff1a;系列文章 分享 模型&#xff0c;了解更多&#x1f449; 模型_思维模型目录。故事五要素&#xff1a;结构、时间、观点、现实、收益 。 1 STORY评估框架的应用 1.1 STORY模型展示其个性化在线学习解决方案的优势 一家在线教育平台想要通过一个故事来展示其个性…

gcc 内建函数示例 __builtin_return_address

1,理论未动&#xff0c;示例先行 hello_gcc_callstack.c #include <stdio.h>void do_backtrace() {void *pc0 __builtin_return_address(0);void *pc1 __builtin_return_address(1);void *pc2 __builtin_return_address(2);void *pc3 __builtin_return_address(3);…

【工具】Docker安装Jenkins并部署Java项目

【工具】Docker安装Jenkins并部署Java项目 文章目录 【工具】Docker安装Jenkins并部署Java项目1. 前置条件2. 安装3. 创建项目3.1 配置Maven3.2 构建项目3.3 自动部署 1. 前置条件 准备一台云服务器或本地虚拟机&#xff0c;保证必须要java环境&#xff0c;一键安装jdk&#x…

Spring 框架:Java 企业级开发的基石

文章目录 序言Spring 框架的核心概念Spring 框架的主要模块Spring Boot&#xff1a;简化 Spring 开发Spring Cloud&#xff1a;构建微服务架构实际案例分析结论 序言 Spring 框架自 2002 年发布以来&#xff0c;已经成为 Java 企业级开发的标准之一。它通过提供全面的基础设施…

相机等效焦距

1. 背景 物理焦距我们很熟悉,但是在接触实际的相机参数时,相机厂家会提到一个参数等效焦距,甚至有时候不提供物理焦距,这时候如果我们得到真实的物理焦距需要进行一定的转换.在介绍两者之间的转换关系前,先介绍一下等效焦距的由来. 如上图,假设在某一个镜头,其成像面会出现图…

C++ vector的使用和简单模拟实现(超级详细!!!)

目录 前言 1.STL是什么 2.vector使用 2.1 vector简介 2.2 常用接口函数 1. 构造函数 2.operator[ ]和size&#xff0c;push_back 3. 用迭代器进行访问和修改 4. 范围for遍历 5.修改类型函数 pop_back find insert erase 6. 容量相关函数capacity resize reserve 3.…

05.爬虫---urllib与requests请求实战(GET)

05.urllib与Requests请求实战GET 1.Urllib模块2.Requests模块3.对比4.实战 GET请求 Python中的GET请求也是HTTP协议中的一种请求方法&#xff0c;用于向服务器请求数据。与POST请求不同&#xff0c;GET请求将数据以查询字符串的形式附加在URL后面&#xff0c;而不是封装在请求体…

Windows10专业版系统安装Hyper-V虚拟机软件

Windows10专业版系统安装Hyper-V虚拟机软件 适用于在Windows10专业版系统安装Hyper-v虚拟机软件。 1. 安装准备 1.1 安装平台 Windows 10 1.2. 软件信息 软件名称软件版本安装路径windowswindows 10 专业版Hyper-vHyper-v 2. Hyper-v搭建 2.1打开cmd软件 2.2打开控制面…