NotifyCenter通知中心的实现原理和使用
参考https://blog.csdn.net/weixin_42937773/article/details/129105694?spm=1001.2014.3001.5502
任务执行引擎的原理和使用
参考https://blog.csdn.net/weixin_42937773/article/details/128892737?spm=1001.2014.3001.5502
服务注册发现事件处理源码
这些核心功能主要采用事件驱动架构实现,在服务注册中发布了ServiceEvent.ServiceChangeEvent、ClientEvent.ClientChangedEvent(由ClientOperationEvent.ClientRegisterServiceEvent事件发布)、MetadataEvent.InstanceMetadataEvent、RegisterInstanceTraceEvent事件,在服务发现中发布了ClientOperationEvent.ClientSubscribeServiceEvent、SubscribeServiceTraceEvent事件,
维护服务注册订阅数据
核心功能:维护(添加/删除)服务注册和订阅内存数据,同时传递相应的客户端事件源码位置:ClientServiceIndexesManager#onEvent处理两类事件:ClientEvent.ClientDisconnectEvent和ClientOperationEvent,ClientOperationEvent包括ClientRegisterServiceEvent、ClientDeregisterServiceEvent、ClientSubscribeServiceEvent、ClientUnsubscribeServiceEvent四种事件
ClientServiceIndexesManager
@Overridepublic void onEvent(Event event) {//客户端下线事件if (event instanceof ClientEvent.ClientDisconnectEvent) {handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);//客户端操作事件 } else if (event instanceof ClientOperationEvent) {handleClientOperation((ClientOperationEvent) event);}}
客户端下线事件处理
private void handleClientDisconnect(ClientEvent.ClientDisconnectEvent event) { //获取客户端信息Client client = event.getClient();//遍历所有的订阅服务for (Service each : client.getAllSubscribeService()) {removeSubscriberIndexes(each, client.getClientId());}//遍历所有的发布服务for (Service each : client.getAllPublishedService()) {removePublisherIndexes(each, client.getClientId());}}
移除订阅服务
private void removeSubscriberIndexes(Service service, String clientId) {if (!subscriberIndexes.containsKey(service)) {return;}subscriberIndexes.get(service).remove(clientId);// 如果服务的订阅者为空,则把服务从订阅表中删除if (subscriberIndexes.get(service).isEmpty()) {subscriberIndexes.remove(service);}}
移除发布服务
private void removePublisherIndexes(Service service, String clientId) {if (!publisherIndexes.containsKey(service)) {return;}//移除服务publisherIndexes.get(service).remove(clientId);//发布服务变更事件NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));}
客户端操作事件处理
private void handleClientOperation(ClientOperationEvent event) {//获取服务信息Service service = event.getService();//获取客户端信息String clientId = event.getClientId();//客户端注册事件if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {addPublisherIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {//客户端注销事件removePublisherIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {// 添加订阅服务的client,同时发布服务订阅事件addSubscriberIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {// 删除订阅服务的client,若没有client订阅service,则移除serviceremoveSubscriberIndexes(service, clientId);}}
客户端注册事件
private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();private void addPublisherIndexes(Service service, String clientId) {//服务和客户端id绑定publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());publisherIndexes.get(service).add(clientId);//发布服务变更事件NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));}
客户端订阅事件
private void addSubscriberIndexes(Service service, String clientId) {//添加订阅服务的客户端subscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());// Fix #5404, Only first time add need notify event.//订阅服务成功 发送服务订阅事件if (subscriberIndexes.get(service).add(clientId)) {NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));}}
客户端注销事件
private void removePublisherIndexes(Service service, String clientId) {if (!publisherIndexes.containsKey(service)) {return;}//移除客户端publisherIndexes.get(service).remove(clientId);NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));}
客户端删除订阅
private void removeSubscriberIndexes(Service service, String clientId) {if (!subscriberIndexes.containsKey(service)) {return;}//移除订阅客户端信息subscriberIndexes.get(service).remove(clientId);//如果订阅服务为空 移除订阅服务if (subscriberIndexes.get(service).isEmpty()) {subscriberIndexes.remove(service);}}
处理服务变更和订阅事件
NamingSubscriberServiceV2Impl
//监听服务变更和订阅事件@Overridepublic void onEvent(Event event) {//判断是否是GRPC协议 如果不是直接返回 if (!upgradeJudgement.isUseGrpcFeatures()) {return;}//服务端变更事件if (event instanceof ServiceEvent.ServiceChangedEvent) {// 服务变更时,向所有客户端推送ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;Service service = serviceChangedEvent.getService();// 添加延时任务到任务引擎 PushDelayTask默认未指定客户端地址,会推送给所有客户端delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));//服务端订阅事件 } else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {// 客户端服务订阅变更时,只推送给这个变更的客户端ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;//获取服务信息Service service = subscribedEvent.getService();// PushDelayTask指定了事件变更的客户端//添加延时任务到任务引擎delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),subscribedEvent.getClientId()));}}
延时任务处理
当NamingSubscriberServiceV2Impl初始化的时候会执行构造方法里去创建一个PushDelayTaskExecuteEngine
public NamingSubscriberServiceV2Impl(ClientManagerDelegate clientManager,ClientServiceIndexesManager indexesManager, ServiceStorage serviceStorage,NamingMetadataManager metadataManager, PushExecutorDelegate pushExecutor, UpgradeJudgement upgradeJudgement,SwitchDomain switchDomain) {this.clientManager = clientManager;this.indexesManager = indexesManager;this.upgradeJudgement = upgradeJudgement;this.delayTaskEngine = new PushDelayTaskExecuteEngine(clientManager, indexesManager, serviceStorage,metadataManager, pushExecutor, switchDomain);NotifyCenter.registerSubscriber(this, NamingEventPublisherFactory.getInstance());}
创建PushDelayTaskExecuteEngine的时候会执行super调用到他父类NacosDelayTaskExecuteEngine中
public PushDelayTaskExecuteEngine(ClientManager clientManager, ClientServiceIndexesManager indexesManager,ServiceStorage serviceStorage, NamingMetadataManager metadataManager,PushExecutor pushExecutor, SwitchDomain switchDomain) {super(PushDelayTaskExecuteEngine.class.getSimpleName(), Loggers.PUSH);this.clientManager = clientManager;this.indexesManager = indexesManager;this.serviceStorage = serviceStorage;this.metadataManager = metadataManager;this.pushExecutor = pushExecutor;this.switchDomain = switchDomain;setDefaultTaskProcessor(new PushDelayTaskProcessor(this));}
public NacosDelayTaskExecuteEngine(String name, Logger logger) {this(name, 32, logger, 100L);}
NacosDelayTaskExecuteEngine中会创建一个ProcessRunnable的任务处理线程交给定时线程池去处理
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {super(logger);tasks = new ConcurrentHashMap<>(initCapacity);processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);}
当有任务需要执行的时候会执行ProcessRunnable 的run方法执行processTasks方法
private class ProcessRunnable implements Runnable {@Overridepublic void run() {try {processTasks();} catch (Throwable e) {getEngineLog().error(e.toString(), e);}}}
processTasks会遍历所有任务然后执行processor 的process方法因为在PushDelayTaskExecuteEngine构造方法中创建DefaultTaskProcessor是PushDelayTaskExecuteEngine这个类所以会执行PushDelayTaskExecuteEngine的process方法
protected void processTasks() {//获取所有任务Collection<Object> keys = getAllTaskKeys();for (Object taskKey : keys) {//移除任务AbstractDelayTask task = removeTask(taskKey);if (null == task) {continue;}NacosTaskProcessor processor = getProcessor(taskKey);if (null == processor) {getEngineLog().error("processor not found for task, so discarded. " + task);continue;}try {// ReAdd task if process failedif (!processor.process(task)) {retryFailedTask(taskKey, task);}} catch (Throwable e) {getEngineLog().error("Nacos task execute error ", e);retryFailedTask(taskKey, task);}}}
@Overridepublic boolean process(NacosTask task) {//强制转换任务PushDelayTask pushDelayTask = (PushDelayTask) task;//获取服务信息Service service = pushDelayTask.getService();//委派给Dispatcher去执行任务 NamingExecuteTaskDispatcher.getInstance().dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));return true;}
往任务线程池中添加任务
public void dispatchAndExecuteTask(Object dispatchTag, AbstractExecuteTask task) {executeEngine.addTask(dispatchTag, task);}
@Overridepublic void addTask(Object tag, AbstractExecuteTask task) {NacosTaskProcessor processor = getProcessor(tag);if (null != processor) {processor.process(task);return;}//最终是往线程池的queue中添加了任务然后等任务调度会执行TaskExecuteWorker worker = getWorker(tag);worker.process(task);}
因为添加的是PushExecuteTask所以当调度了之后会执行到PushExecuteTask的run方法中
@Overridepublic void run() {try {//包装一下 包装成wrapperPushDataWrapper wrapper = generatePushData();//获取客户端ManagerClientManager clientManager = delayTaskEngine.getClientManager();//获取需要通知的客户端for (String each : getTargetClientIds()) {//拿到客户端信息Client client = clientManager.getClient(each);//客户端是空直接返回if (null == client) {// means this client has disconnectcontinue;}//获取服务的订阅者SubscriberSubscriber subscriber = clientManager.getClient(each).getSubscriber(service);//发送RPC请求delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,new NamingPushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));}} catch (Exception e) {Loggers.PUSH.error("Push task for service" + service.getGroupedServiceName() + " execute failed ", e);delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L));}}
发送GRPC请求
@Overridepublic void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data, PushCallBack callBack) {pushService.pushWithCallback(clientId, NotifySubscriberRequest.buildNotifySubscriberRequest(getServiceInfo(data, subscriber)),callBack, GlobalExecutor.getCallbackExecutor());}
/*** push response with no ack.** @param connectionId connectionId.* @param request request.* @param requestCallBack requestCallBack.*/public void pushWithCallback(String connectionId, ServerRequest request, PushCallBack requestCallBack,Executor executor) {Connection connection = connectionManager.getConnection(connectionId);if (connection != null) {try {connection.asyncRequest(request, new AbstractRequestCallBack(requestCallBack.getTimeout()) {@Overridepublic Executor getExecutor() {return executor;}@Overridepublic void onResponse(Response response) {if (response.isSuccess()) {requestCallBack.onSuccess();} else {requestCallBack.onFail(new NacosException(response.getErrorCode(), response.getMessage()));}}@Overridepublic void onException(Throwable e) {requestCallBack.onFail(e);}});} catch (ConnectionAlreadyClosedException e) {connectionManager.unregister(connectionId);requestCallBack.onSuccess();} catch (Exception e) {Loggers.REMOTE_DIGEST.error("error to send push response to connectionId ={},push response={}", connectionId,request, e);requestCallBack.onFail(e);}} else {requestCallBack.onSuccess();}}
客户端接收连接
NamingGrpcClientProxy
private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {rpcClient.serverListFactory(serverListFactory);// gRPC Client启动rpcClient.start();// 注解@10rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));// 注册连接事件Listener,当连接建立和断开时处理事件rpcClient.registerConnectionListener(namingGrpcConnectionEventListener);
}
在客户端构建gRPC时,注册registerServerRequestHandler用于处理从Nacos Push到Client的请求,添加到了serverRequestHandlers集合。
GrpcClient#connectToServer()
@Override
public Connection connectToServer(ServerInfo serverInfo) {try {RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(serverInfo.getServerIp(),serverInfo.getServerPort() + rpcPortOffset());if (newChannelStubTemp != null) {Response response = serverCheck(newChannelStubTemp);BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc.newStub(newChannelStubTemp.getChannel());GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId());//create stream request and bind connection event to this connection.// 注解@11StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);// ...return grpcConn;}return null;} catch (Exception e) {// ...}return null;
}
在连接server时绑定相关事件
private StreamObserver<Payload> bindRequestStream(final BiRequestStreamGrpc.BiRequestStreamStub streamStub,final GrpcConnection grpcConn) {return streamStub.requestBiStream(new StreamObserver<Payload>() {@Overridepublic void onNext(Payload payload) {LoggerUtils.printIfDebugEnabled(LOGGER, "[{}]Stream server request receive, original info: {}",grpcConn.getConnectionId(), payload.toString());try {Object parseBody = GrpcUtils.parse(payload);final Request request = (Request) parseBody;if (request != null) {try {// 注解@12Response response = handleServerRequest(request);if (response != null) {response.setRequestId(request.getRequestId());sendResponse(response);} else {}} catch (Exception e) {}}} catch (Exception e) {}}});
}
接受server push处理,本事件具体回调到NamingPushRequestHandler#requestReply
@Override
public Response requestReply(Request request) {if (request instanceof NotifySubscriberRequest) {NotifySubscriberRequest notifyResponse = (NotifySubscriberRequest) request;serviceInfoHolder.processServiceInfo(notifyResponse.getServiceInfo());return new NotifySubscriberResponse();}return null;
}
所以最终会执行到NamingPushRequestHandler的requestReply方法进行处理
@Overridepublic Response requestReply(Request request) {if (request instanceof NotifySubscriberRequest) {NotifySubscriberRequest notifyResponse = (NotifySubscriberRequest) request;serviceInfoHolder.processServiceInfo(notifyResponse.getServiceInfo());return new NotifySubscriberResponse();}return null;}
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {String serviceKey = serviceInfo.getKey();if (serviceKey == null) {return null;}ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());if (isEmptyOrErrorPush(serviceInfo)) {//empty or error push, just ignorereturn oldService;}// 缓存服务信息serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);// 判断注册的实例信息是否已变更boolean changed = isChangedServiceInfo(oldService, serviceInfo);if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));}// 通过prometheus-simpleclient监控服务缓存Map的大小MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());// 服务实例已变更if (changed) {NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "+ JacksonUtils.toJson(serviceInfo.getHosts()));// 添加实例变更事件,会被推动到订阅者执行NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),serviceInfo.getClusters(), serviceInfo.getHosts()));// 记录Service本地文件DiskCache.write(serviceInfo, cacheDir);}return serviceInfo;
}