ShenYu网关注册中心之HTTP注册原理

文章目录

  • 1、客户端注册流程
    • 1.1、读取配置
      • 1.1.1、用于注册的 HttpClientRegisterRepository
      • 1.1.2、用于扫描构建 元数据 和 URI 的 SpringMvcClientEventListener
    • 1.2、扫描注解,注册元数据和URI
      • 1.2.1、构建URI并写入Disruptor
      • 1.2.2、构建元数据并写入Disruptor
      • 1.2.3、Disruptor消费数据并向shenyu-admin注册数据
  • 2、服务端注册流程
    • 2.1、读取配置
      • 2.1.1、用于监听的ShenyuClientServerRegisterRepository
    • 2.2、注册元数据和URI
      • 2.2.1、注册接口接收数据写入Disruptor
      • 2.2.2、Disruptor消费数据并持久化

1、客户端注册流程

当客户端启动后,根据相关配置,读取属性信息,然后写入队列。以官方提供的 shenyu-examples-http 为例,开始源码分析。

1.1、读取配置

该例子是一个springboot项目,所以注册的入口往往在自动装配类中。不妨可以先看下项目的pom文件中引入了什么依赖:

<dependencies><dependency><groupId>org.apache.shenyu</groupId><artifactId>shenyu-spring-boot-starter-client-springmvc</artifactId><version>${project.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>

这里面看到就shenyu-spring-boot-starter-client-springmvc是跟ShenYu相关的,所以入口应该就在这个依赖内了,看下这个依赖的项目结构:
在这里插入图片描述
发现就是两个配置类,ShenyuSpringMvcClientInfoRegisterConfiguration由于使用了@Configuration(proxyBeanMethods = false),暂时不用关注,重点关注ShenyuSpringMvcClientConfiguration,它是shenyu客户端http注册配置类。

/*** shenyu 客户端http注册配置类*/
@Configuration
// shenyu客户端通用配置类
@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)
@ConditionalOnProperty(value = "shenyu.register.enabled", matchIfMissing = true, havingValue = "true")
public class ShenyuSpringMvcClientConfiguration {static {VersionUtils.checkDuplicate(ShenyuSpringMvcClientConfiguration.class);}/**** 监听并处理http元数据和URI信息的注册** @param clientConfig                   客户端注册配置* @param shenyuClientRegisterRepository 客户端注册类*/@Bean@ConditionalOnMissingBean(ClientRegisterConfiguration.class)// 这里的两个参数是由ShenyuClientCommonBeanConfiguration导入的public SpringMvcClientEventListener springHttpClientEventListener(final ShenyuClientConfig clientConfig,final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {return new SpringMvcClientEventListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()), shenyuClientRegisterRepository);}
}

通过@Configuration表示这是一个配置类,通过@ImportAutoConfiguration引入ShenyuClientCommonBeanConfiguration配置类。

/*** shenyu客户端通用配置类,创建注册中心客户端通用的bean*/
@Configuration
@ConditionalOnProperty(value = "shenyu.register.enabled", matchIfMissing = true, havingValue = "true")
public class ShenyuClientCommonBeanConfiguration {/*** 根据注册中心配置通过SPI方式创建客户端注册类*/@Beanpublic ShenyuClientRegisterRepository shenyuClientRegisterRepository(final ShenyuRegisterCenterConfig config) {return ShenyuClientRegisterRepositoryFactory.newInstance(config);}/*** Shenyu 客户端注册中心配置,读取shenyu.register属性配置*/@Bean@ConfigurationProperties(prefix = "shenyu.register")public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {return new ShenyuRegisterCenterConfig();}/*** Shenyu 客户端配置,读取shenyu.client属性配置*/@Bean@ConfigurationProperties(prefix = "shenyu")public ShenyuClientConfig shenyuClientConfig() {return new ShenyuClientConfig();}
}

ShenyuClientCommonBeanConfigurationShenYu客户端的通用配置类,创建了3个通用bean。

  • ShenyuClientRegisterRepository:客户端注册类,用于将客户端接口信息注册到注册中心。
  • ShenyuRegisterCenterConfig:ShenYu客户端注册中心配置类,读取shenyu.register属性配置。
  • ShenyuClientConfig:ShenYu客户端配置类,读取shenyu.client属性配置。

1.1.1、用于注册的 HttpClientRegisterRepository

上面生成的ShenyuClientRegisterRepository是用于实现客户端注册的接口,会根据注册中心的配置通过SPI方式创建客户端注册类,每一个注册方式都对应一个实现类。
在这里插入图片描述
目前支持7种注册类型:

  • Http:HttpClientRegisterRepository
  • Apollo:ApolloClientRegisterRepository
  • Zookeeper:ZookeeperClientRegisterRepository
  • Etcd:EtcdClientRegisterRepository
  • Nacos:NacosClientRegisterRepository
  • Consul:ConsulClientRegisterRepository
  • Polaris:PolarisClientRegisterRepository
public final class ShenyuClientRegisterRepositoryFactory {private static final Map<String, ShenyuClientRegisterRepository> REPOSITORY_MAP = new ConcurrentHashMap<>();/*** 根据注册中心类型实例化注册服务*/public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {// 通过SPI方式创建客户端注册类ShenyuClientRegisterRepository result = ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());// 初始化对应客户端注册类,比如创建zookeeper client,etcd client,admin平台的token等result.init(shenyuRegisterCenterConfig);ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);return result;}return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());}
}

加载类型通过registerType指定,也就是我们在配置文件中指定的类型:

shenyu:register:registerType: httpserverLists: http://localhost:9095props:username: adminpassword: 123qweQWE$$

这里指定的是http,所以这里创建的就是HttpClientRegisterRepositoryprops是用于连接注册中心的一些额外属性,比如用户名,密码,命名空间等。
创建对应的注册客户端后,会调用init方法根据shenyu.register下的配置进行初始化:

@Join
public class HttpClientRegisterRepository extends FailbackRegistryRepository {public HttpClientRegisterRepository() {}public HttpClientRegisterRepository(final ShenyuRegisterCenterConfig config) {init(config);}@Overridepublic void init(final ShenyuRegisterCenterConfig config) {// shenyu-admin用户名this.username = config.getProps().getProperty(Constants.USER_NAME);// shenyu-admin密码this.password = config.getProps().getProperty(Constants.PASS_WORD);// shenyu-admin集群地址this.serverList = Lists.newArrayList(Splitter.on(",").split(config.getServerLists()));// 根据上面3个信息请求shenyu-admin获取accessToken,用于后面调用注册接口this.accessToken = Caffeine.newBuilder()//see org.apache.shenyu.admin.config.properties.JwtProperties#expiredSeconds.expireAfterWrite(24L, TimeUnit.HOURS).build(new CacheLoader<String, String>() {@Overridepublic @Nullable String load(@NonNull final String server) throws Exception {try {// 调用shenyu-admin的登录接口(/platform/login),获取accessTokenOptional<?> login = RegisterUtils.doLogin(username, password, server.concat(Constants.LOGIN_PATH));return login.map(String::valueOf).orElse(null);} catch (Exception e) {LOGGER.error("Login admin url :{} is fail, will retry. cause: {} ", server, e.getMessage());return null;}}});}
}

这里主要就是去调shenyu-admin的登录接口获取accessToken,为后面的发送注册数据做准备。其他注册类型的ShenyuClientRegisterRepository也一样,创建各自注册中心的client,连接注册中心,为发送数据做准备。类注解@Join用于SPI的加载。

1.1.2、用于扫描构建 元数据 和 URI 的 SpringMvcClientEventListener

回到一开始的ShenyuSpringMvcClientConfiguration配置类:

/*** shenyu 客户端http注册配置类*/
@Configuration
// shenyu客户端通用配置类
@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)
@ConditionalOnProperty(value = "shenyu.register.enabled", matchIfMissing = true, havingValue = "true")
public class ShenyuSpringMvcClientConfiguration {static {VersionUtils.checkDuplicate(ShenyuSpringMvcClientConfiguration.class);}/**** 监听并处理http元数据和URI信息的注册** @param clientConfig                   客户端注册配置* @param shenyuClientRegisterRepository 客户端注册类*/@Bean@ConditionalOnMissingBean(ClientRegisterConfiguration.class)// 这里的两个参数是由ShenyuClientCommonBeanConfiguration导入的public SpringMvcClientEventListener springHttpClientEventListener(final ShenyuClientConfig clientConfig,final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {return new SpringMvcClientEventListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()), shenyuClientRegisterRepository);}
}

创建了SpringMvcClientEventListener,负责客户端 元数据URI 数据的构建和注册。SpringMvcClientEventListener继承了AbstractContextRefreshedEventListener,而AbstractContextRefreshedEventListener是一个抽象类,它实现了ApplicationListener接口,并重写了onApplicationEvent()方法,当有Spring事件发生后,该方法会执行。每一种后端服务RPC调用协议都对应了一个监听类。
在这里插入图片描述

public class SpringMvcClientEventListener extends AbstractContextRefreshedEventListener<Object, ShenyuSpringMvcClient> {public SpringMvcClientEventListener(final PropertiesConfig clientConfig,final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {super(clientConfig, shenyuClientRegisterRepository);// client配置Properties props = clientConfig.getProps();// 是否是全部接口都注册this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));// http协议this.protocol = props.getProperty(ShenyuClientConstants.PROTOCOL, ShenyuClientConstants.HTTP);this.addPrefixed = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.ADD_PREFIXED,Boolean.FALSE.toString()));mappingAnnotation.add(ShenyuSpringMvcClient.class);mappingAnnotation.add(RequestMapping.class);}// ...}

SpringMvcClientEventListener的构造函数主要就是调用父类AbstractContextRefreshedEventListener的构造函数,传入客户端配置和客户端注册类,客户端配置指shenyu.client.http下的配置:

shenyu:client:http:props:contextPath: /httpappName: http-appNameport: 8189isFull: false
public abstract class AbstractContextRefreshedEventListener<T, A extends Annotation> implements ApplicationListener<ContextRefreshedEvent> {protected static final String PATH_SEPARATOR = "/";// Disruptor 发布器private final ShenyuClientRegisterEventPublisher publisher = ShenyuClientRegisterEventPublisher.getInstance();// ...public AbstractContextRefreshedEventListener(final PropertiesConfig clientConfig,final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {// 读取 shenyu.client.http 配置信息Properties props = clientConfig.getProps();this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);this.contextPath = Optional.ofNullable(props.getProperty(ShenyuClientConstants.CONTEXT_PATH)).map(UriUtils::repairData).orElse("");if (StringUtils.isBlank(appName) && StringUtils.isBlank(contextPath)) {String errorMsg = "client register param must config the appName or contextPath";LOG.error(errorMsg);throw new ShenyuClientIllegalArgumentException(errorMsg);}this.ipAndPort = props.getProperty(ShenyuClientConstants.IP_PORT);this.host = props.getProperty(ShenyuClientConstants.HOST);this.port = props.getProperty(ShenyuClientConstants.PORT);// 开始事件发布,启动 Disruptorpublisher.start(shenyuClientRegisterRepository);}   }

取出相关配置信息后,就启动 Disruptor 队列,ShenyuClientRegisterEventPublisher可以看作是一个生产者,用来向队列发送数据

public class ShenyuClientRegisterEventPublisher {private static final ShenyuClientRegisterEventPublisher INSTANCE = new ShenyuClientRegisterEventPublisher();private DisruptorProviderManage<DataTypeParent> providerManage;public static ShenyuClientRegisterEventPublisher getInstance() {return INSTANCE;}/*** Start.** @param shenyuClientRegisterRepository shenyuClientRegisterRepository*/public void start(final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {// 注册任务工厂类,用于创建注册的任务,客户端使用的是RegisterClientExecutorFactory, // 而在服务端(shenyu-admin)用于处理注册任务的是RegisterServerConsumerExecutor,// 都是用于消费Disruptor数据的任务RegisterClientExecutorFactory factory = new RegisterClientExecutorFactory();// 添加元数据订阅器factory.addSubscribers(new ShenyuClientMetadataExecutorSubscriber(shenyuClientRegisterRepository));// 添加URI订阅器factory.addSubscribers(new ShenyuClientURIExecutorSubscriber(shenyuClientRegisterRepository));// 添加ApiDoc订阅器factory.addSubscribers(new ShenyuClientApiDocExecutorSubscriber(shenyuClientRegisterRepository));providerManage = new DisruptorProviderManage<>(factory);// 启动Disruptor队列,并创建消费者providerManage.startup();}/*** 发布事件,向Disruptor队列发数据** @param data the data*/public void publishEvent(final DataTypeParent data) {DisruptorProvider<DataTypeParent> provider = providerManage.getProvider();provider.onData(data);}
}

start方法主要是为队列添加订阅器,会由消费者接收到信息后调用这些订阅器。然后启动启动Disruptor队列,并创建消费者。

public class DisruptorProviderManage<T> {public void startup() {this.startup(false);}public void startup(final boolean isOrderly) {OrderlyExecutor executor = new OrderlyExecutor(isOrderly, consumerSize, consumerSize, 0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),DisruptorThreadFactory.create("shenyu_disruptor_consumer_", false), new ThreadPoolExecutor.AbortPolicy());int newConsumerSize = this.consumerSize;EventFactory<DataEvent<T>> eventFactory;if (isOrderly) {newConsumerSize = 1;eventFactory = new OrderlyDisruptorEventFactory<>();} else {eventFactory = new DisruptorEventFactory<>();}Disruptor<DataEvent<T>> disruptor = new Disruptor<>(eventFactory,size,DisruptorThreadFactory.create("shenyu_disruptor_provider_" + consumerFactory.fixName(), false),ProducerType.MULTI,new BlockingWaitStrategy());// 创建消费者@SuppressWarnings("all")QueueConsumer<T>[] consumers = new QueueConsumer[newConsumerSize];for (int i = 0; i < newConsumerSize; i++) {consumers[i] = new QueueConsumer<>(executor, consumerFactory);}// 设置消费者disruptor.handleEventsWithWorkerPool(consumers);disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());// 真正调用disruptor的api启动disruptor.start();RingBuffer<DataEvent<T>> ringBuffer = disruptor.getRingBuffer();// disruptor的生产者provider = new DisruptorProvider<>(ringBuffer, disruptor, isOrderly);}  }

这里就是准备Disruptor队列的一些逻辑,就不细讲了,其中QueueConsumerDisruptor的消费者,后面就是由它接收数据。

1.2、扫描注解,注册元数据和URI

上面说到SpringMvcClientEventListener继承了AbstractContextRefreshedEventListener,而AbstractContextRefreshedEventListener实现了ApplicationListener接口,并重写了onApplicationEvent()方法,当有Spring事件发生后,该方法会执行。

// 当有上下文刷新事件ContextRefreshedEvent发生时,该方法会执行,算是客户端的执行入口吧
@Override
public void onApplicationEvent(@NonNull final ContextRefreshedEvent event) {context = event.getApplicationContext();// 获取客户端的接口类,比如http就是Controller类,dubbo就是@DubboService类,由子类实现Map<String, T> beans = getBeans(context);if (MapUtils.isEmpty(beans)) {return;}// 保证只注册一次if (!registered.compareAndSet(false, true)) {return;}// 构建URI并写入Disruptor,由子类实现publisher.publishEvent(buildURIRegisterDTO(context, beans));// 构建元数据并写入Disruptorbeans.forEach(this::handle);Map<String, Object> apiModules = context.getBeansWithAnnotation(ApiModule.class);apiModules.forEach((k, v) -> handleApiDoc(v, beans));
}

获取客户端服务的接口类,由具体的子类实现,http就是Controller类,这里对应的子类就是SpringMvcClientEventListener

@Override
protected Map<String, Object> getBeans(final ApplicationContext context) {// Filter outif (Boolean.TRUE.equals(isFull)) {// isFull=true,表示代理整个服务,就不需要注解扫描了,// 直接构建元数据和URI,写入DisruptorgetPublisher().publishEvent(MetaDataRegisterDTO.builder().contextPath(getContextPath()).addPrefixed(addPrefixed).appName(getAppName()).path(PathUtils.decoratorPathWithSlash(getContextPath())).rpcType(RpcTypeEnum.HTTP.getName()).enabled(true).ruleName(getContextPath()).build());LOG.info("init spring mvc client success with isFull mode");// 构建URIpublisher.publishEvent(buildURIRegisterDTO(context, Collections.emptyMap()));return Collections.emptyMap();}// 否则获取@Controller注解的beanreturn context.getBeansWithAnnotation(Controller.class);
}

这里会判断配置文件中的shenyu.client.http.props.isFull,如果是true,则直接构建一个元数据URI,写入到Disruptor中,然后返回一个空集合,后续的逻辑就没执行了。如果是false,则从spring容器中获取带@Controller注解的bean返回。

1.2.1、构建URI并写入Disruptor

构建一个URI数据写入到Disruptor,这个也是由子类实现的:

// 构建URI
@Override
protected URIRegisterDTO buildURIRegisterDTO(final ApplicationContext context,final Map<String, Object> beans) {try {return URIRegisterDTO.builder().contextPath(getContextPath()) // shneyu得contextPath.appName(getAppName()) // appName.protocol(protocol) // 服务协议.host(super.getHost()) // 服务host.port(Integer.valueOf(getPort())) // 服务端口.rpcType(RpcTypeEnum.HTTP.getName()) // rpc类型.eventType(EventType.REGISTER) // 事件类型.build();} catch (ShenyuException e) {throw new ShenyuException(e.getMessage() + "please config ${shenyu.client.http.props.port} in xml/yml !");}
}

可以看出来URI跟接口类没有关系,一个后端服务实例生成一个URI。

1.2.2、构建元数据并写入Disruptor

之后遍历每个接口构建元数据beans.forEach(this::handle)

/*** 构建元数据并写入Disruptor*/
protected void handle(final String beanName, final T bean) {Class<?> clazz = getCorrectedClass(bean);// 获取当前bean的对应shenyu客户端的注解,比如http是@ShenyuSpringMvcClient, // dubbo是@ShenyuDubboClientfinal A beanShenyuClient = AnnotatedElementUtils.findMergedAnnotation(clazz, getAnnotationType());// 获取bean对应的path(类上注解的路径),由子类实现final String superPath = buildApiSuperPath(clazz, beanShenyuClient);// 如果有shenyu客户端注解并且path中包含*,则表示要注册整个类的方法,只需要构建一个类元数据if (Objects.nonNull(beanShenyuClient) && superPath.contains("*")) {// 由具体的子类构建类元数据写入DisruptorhandleClass(clazz, bean, beanShenyuClient, superPath);return;}// 类上没有shenyu客户端注解(类上没有注解,但方法上有注解,也是可以注册的),// 或者有注解但是path没有包含*,则就要遍历每个方法,为每个需要注册的方法构建方法元数据final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(clazz);for (Method method : methods) {// 由具体子类构建方法元数据写入Disruptor,并将每个method对应的元数据对象缓存在当前类里handleMethod(bean, clazz, beanShenyuClient, method, superPath);}
}protected void handleClass(final Class<?> clazz,final T bean,@NonNull final A beanShenyuClient,final String superPath) {publisher.publishEvent(buildMetaDataDTO(bean, beanShenyuClient, pathJoin(contextPath, superPath), clazz, null));
}protected void handleMethod(final T bean,final Class<?> clazz,@Nullable final A beanShenyuClient,final Method method,final String superPath) {// 如果方法上有Shenyu客户端注解,就表示该方法需要注册A methodShenyuClient = AnnotatedElementUtils.findMergedAnnotation(method, getAnnotationType());if (Objects.nonNull(methodShenyuClient)) {final MetaDataRegisterDTO metaData = buildMetaDataDTO(bean, methodShenyuClient,buildApiPath(method, superPath, methodShenyuClient), clazz, method);publisher.publishEvent(metaData);metaDataMap.put(method, metaData);}
}
// 获取接口对应路径,如果shenyu注解上没有,就用@RequestMapping上的路径,
// 但是这个只支持第一个路径
@Override
protected String buildApiSuperPath(final Class<?> clazz, @Nullable final ShenyuSpringMvcClient beanShenyuClient) {if (Objects.nonNull(beanShenyuClient) && StringUtils.isNotBlank(beanShenyuClient.path())) {return beanShenyuClient.path();}RequestMapping requestMapping = AnnotationUtils.findAnnotation(clazz, RequestMapping.class);// Only the first path is supported temporarilyif (Objects.nonNull(requestMapping) && ArrayUtils.isNotEmpty(requestMapping.path()) && StringUtils.isNotBlank(requestMapping.path()[0])) {return requestMapping.path()[0];}return "";
}// springmvc接口上需要有 ShenyuSpringMvcClient 注解,
// 并且包含RequestMapping注解(表示是一个接口),才进行注册
protected void handleMethod(final Object bean, final Class<?> clazz,@Nullable final ShenyuSpringMvcClient beanShenyuClient,final Method method, final String superPath) {final RequestMapping requestMapping = AnnotatedElementUtils.findMergedAnnotation(method, RequestMapping.class);ShenyuSpringMvcClient methodShenyuClient = AnnotatedElementUtils.findMergedAnnotation(method, ShenyuSpringMvcClient.class);methodShenyuClient = Objects.isNull(methodShenyuClient) ? beanShenyuClient : methodShenyuClient;// 如果有 ShenyuSpringMvcClient 注解并且包含RequestMapping注解(表示是一个接口),则进行注册if (Objects.nonNull(methodShenyuClient) && Objects.nonNull(requestMapping)) {// 构建元数据final MetaDataRegisterDTO metaData = buildMetaDataDTO(bean, methodShenyuClient,// 构建path = contextPath + 类上的路径 + 方法上的路径                                        buildApiPath(method, superPath, methodShenyuClient), clazz, method);// 发布元数据getPublisher().publishEvent(metaData);getMetaDataMap().put(method, metaData);}
}// path = contextPath + 类上的路径 + 方法上的路径,
// 如果@ShenyuSpringMvcClient注解上的路径不为空,则方法上的路径=@ShenyuSpringMvcClient上的value,
// 否则,方法上的路径=@RequestMapping上的value
@Override
protected String buildApiPath(final Method method, final String superPath,@NonNull final ShenyuSpringMvcClient methodShenyuClient) {String contextPath = getContextPath();if (StringUtils.isNotBlank(methodShenyuClient.path())) {return pathJoin(contextPath, superPath, methodShenyuClient.path());}final String path = getPathByMethod(method);if (StringUtils.isNotBlank(path)) {return pathJoin(contextPath, superPath, path);}return pathJoin(contextPath, superPath);
}

1.2.3、Disruptor消费数据并向shenyu-admin注册数据

上面启动Disruptor的时候说到QueueConsumer实现了WorkHandler接口,是Disruptor的消费者,消费逻辑就在它的onEvent方法中:

public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {private final OrderlyExecutor executor;private final QueueConsumerFactory<T> factory;/*** Instantiates a new Queue consumer.** @param executor the executor* @param factory  the factory*/public QueueConsumer(final OrderlyExecutor executor, final QueueConsumerFactory<T> factory) {this.executor = executor;this.factory = factory;}@Overridepublic void onEvent(final DataEvent<T> t) {if (Objects.nonNull(t)) {// 根据事件类型使用不同的线程池ThreadPoolExecutor executor = orderly(t);// 通过工厂创建队列消费任务 RegisterClientConsumerExecutorQueueConsumerExecutor<T> queueConsumerExecutor = factory.create();// 为消费任务设置数据queueConsumerExecutor.setData(t.getData());t.setData(null);// 放在线程池中执行 消费任务executor.execute(queueConsumerExecutor);}}// ...
}

QueueConsumerExecutor是实现了Runnable的消费任务,它有两个实现:

  • RegisterClientConsumerExecutor:客户端消费者任务
  • RegisterServerConsumerExecutor:服务端消费者任务

从名字也可以看出,RegisterClientConsumerExecutor负责处理客户端任务,shenyu客户端将元数据和URI写入disruptor后由这个消费者任务来消费数据,执行实际向注册中心注册的操作。RegisterServerConsumerExecutor负责处理服务端(shenyu-admin)任务,服务端从注册中心监听到元数据URI后写入disruptor,然后由RegisterServerConsumerExecutor任务来消费数据,处理数据入库操作和发布事件。
RegisterClientConsumerExecutor的消费逻辑:

public final class RegisterClientConsumerExecutor<T extends DataTypeParent> extends QueueConsumerExecutor<T> {private final Map<DataType, ExecutorTypeSubscriber<T>> subscribers;private RegisterClientConsumerExecutor(final Map<DataType, ExecutorTypeSubscriber<T>> executorSubscriberMap) {this.subscribers = new EnumMap<>(executorSubscriberMap);}@Overridepublic void run() {// 获取数据final T data = getData();// 根据数据类型获取对应的处理器进行处理,即在disruptor启动的时候添加的订阅器subscribers.get(data.getType()).executor(Lists.newArrayList(data));}// ... 
}

根据不同的数据类型使用不同的订阅器执行器去执行,这些订阅器是在disruptor启动的时候设置的。目前注册的数据类型有3种,元数据URIAPI文档

public enum DataType {/*** Meta data data type enum.*/META_DATA,/*** Uri data type enum.*/URI,/*** Api doc type enum.*/API_DOC,
}

所以相对应的订阅器也分为3类,分别处理元数据URI和API文档。在客户端和服务端分别有两个,所以一共是6个。
在这里插入图片描述
元数据处理

public class ShenyuClientMetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {private final ShenyuClientRegisterRepository shenyuClientRegisterRepository;// .../*** 遍历元数据,对数据注册到注册中心*/@Overridepublic void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {for (MetaDataRegisterDTO metaDataRegisterDTO : metaDataRegisterDTOList) {// 调用响应注册中心的客户端注册类注册元数据shenyuClientRegisterRepository.persistInterface(metaDataRegisterDTO);}}
}

遍历数据,然后又将数据委托给ShenyuClientRegisterRepository执行。ShenyuClientRegisterRepository是在一开始读取配置的时候就创建了,是客户端注册类,用来将数据发送到注册中心的类,不同的注册方式有不同的实现类,该示例使用http方式注册(shenyu.register.registerType=http)的实现类是HttpClientRegisterRepositoryHttpClientRegisterRepository并没有直接实现ShenyuClientRegisterRepository接口,而是继承FailbackRegistryRepositoryFailbackRegistryRepository实现了ShenyuClientRegisterRepository接口,FailbackRegistryRepository本身主要用于对http注册过程中的失败重试。

@Override
public void persistInterface(final MetaDataRegisterDTO metadata) {try {this.doPersistInterface(metadata);} catch (Exception ex) {//If a failure occurs, it needs to be added to the retry list.// 如果注册失败,则添加到重试列表,过一段时间后重新注册logger.warn("Failed to persistInterface {}, cause:{}", metadata, ex.getMessage());this.addFailureMetaDataRegister(metadata);}
}@Override
public void doPersistInterface(final MetaDataRegisterDTO metadata) {//  META_PATH = "/shenyu-client/register-metadata"doRegister(metadata, Constants.META_PATH, Constants.META_TYPE);
}// 发送注册数据到admin
private <T> void doRegister(final T t, final String path, final String type) {int i = 0;// admin集群中的每个都要去注册(admin之间没有同步机制么?)for (String server : serverList) {i++;// 拼接上admin的地址和具体的接口路径,构成完整的请求地址String concat = server.concat(path);try {// 调用admin接口的tokenString accessToken = this.accessToken.get(server);if (StringUtils.isBlank(accessToken)) {throw new NullPointerException("accessToken is null");}// 通过工具类发送http请求RegisterUtils.doRegister(GsonUtils.getInstance().toJson(t), concat, type, accessToken);// considering the situation of multiple clusters, we should continue to execute here} catch (Exception e) {LOGGER.error("Register admin url :{} is fail, will retry. cause:{}", server, e.getMessage());if (i == serverList.size()) {throw new RuntimeException(e);}}}
}

http注册方式比较简单,遍历每个admin服务,获取到accessToken后,向/shenyu-client/register-metadata接口地址发起http请求,将数据发送给admin。
URI处理

public class ShenyuClientURIExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {@Overridepublic void executor(final Collection<URIRegisterDTO> dataList) {for (URIRegisterDTO uriRegisterDTO : dataList) {Stopwatch stopwatch = Stopwatch.createStarted();// 这里的逻辑是为了探测客户端是否已经启动while (true) {try (Socket ignored = new Socket(uriRegisterDTO.getHost(), uriRegisterDTO.getPort())) {break;} catch (IOException e) {long sleepTime = 1000;// maybe the port is delay exposedif (stopwatch.elapsed(TimeUnit.SECONDS) > 5) {LOG.error("host:{}, port:{} connection failed, will retry",uriRegisterDTO.getHost(), uriRegisterDTO.getPort());// If the connection fails for a long time, Increase sleep timeif (stopwatch.elapsed(TimeUnit.SECONDS) > 180) {sleepTime = 10000;}}try {TimeUnit.MILLISECONDS.sleep(sleepTime);} catch (InterruptedException ex) {LOG.error("interrupted when sleep", ex);}}}ShenyuClientShutdownHook.delayOtherHooks();// 向注册中心注册URI数据shenyuClientRegisterRepository.persistURI(uriRegisterDTO);// 优雅停机ShutdownHookManager.get().addShutdownHook(new Thread(() -> {final URIRegisterDTO offlineDTO = new URIRegisterDTO();BeanUtils.copyProperties(uriRegisterDTO, offlineDTO);offlineDTO.setEventType(EventType.OFFLINE);shenyuClientRegisterRepository.offline(offlineDTO);}), 2);}}
}

URI注册逻辑基本相似,只是比元数据多了一步探测客户端时候已经启动完成的操作,保证客户端启动完成后再注册URI,后面的逻辑就跟元数据一样了。
分析到这里就将客户端的注册逻辑分析完了,通过读取自定义的注解信息构造元数据和URI,将数据发到Disruptor队列,然后从队列中消费数据,将消费者放到线程池中去执行,最终通过发送http请求到admin,元数据注册接口是/shenyu-client/register-metadataURI注册接口是/shenyu-client/register-uri

2、服务端注册流程

2.1、读取配置

从前面分析到,admin的两个注册接口分别是/shenyu-client/register-metadata/shenyu-client/register-uri,通过全局搜索发现这两个接口在ShenyuClientHttpRegistryController类。

@RequestMapping("/shenyu-client")
@Join
public class ShenyuClientHttpRegistryController implements ShenyuClientServerRegisterRepository {/*** 注册元数据*/@PostMapping("/register-metadata")@ResponseBodypublic String registerMetadata(@RequestBody final MetaDataRegisterDTO metaDataRegisterDTO) {// 直接将元数据发布到Disruptorpublisher.publish(metaDataRegisterDTO);return ShenyuResultMessage.SUCCESS;}/*** 注册URI*/@PostMapping("/register-uri")@ResponseBodypublic String registerURI(@RequestBody final URIRegisterDTO uriRegisterDTO) {// 直接将URI发布到Disruptorpublisher.publish(uriRegisterDTO);return ShenyuResultMessage.SUCCESS;}
}

但是这个类上并没有@RestController注解,那它就不能做为一个bean被spring扫描到,那它是如何创建的呢?
不着急,想要从注册中心中监听到数据(http注册方式可以将admin当作注册中心),自然需要有注册中心配置,来表明使用哪个注册中心。admin的注册中心配置是RegisterCenterConfiguration,我们先看这个配置类:

/*** 注册中心配置类*/
@Configuration
public class RegisterCenterConfiguration {/*** 读取shenyu.register配置*/@Bean@ConfigurationProperties(prefix = "shenyu.register")public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {return new ShenyuRegisterCenterConfig();}/*** 创建用于服务端的注册类,从注册中心中监听数据,然后将数据写入Disruptor队列中*/@Bean(destroyMethod = "close")public ShenyuClientServerRegisterRepository shenyuClientServerRegisterRepository(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig,final List<ShenyuClientRegisterService> shenyuClientRegisterService) {// 从配置中获取注册类型String registerType = shenyuRegisterCenterConfig.getRegisterType();// 根据注册类型通过SPI方式创建对应的ShenyuClientServerRegisterRepositoryShenyuClientServerRegisterRepository registerRepository = ExtensionLoader.getExtensionLoader(ShenyuClientServerRegisterRepository.class).getJoin(registerType);// 创建Disruptor发布者RegisterClientServerDisruptorPublisher publisher = RegisterClientServerDisruptorPublisher.getInstance();// 每种客户端类型(rpc类型)的处理类Map<String, ShenyuClientRegisterService> registerServiceMap = shenyuClientRegisterService.stream().collect(Collectors.toMap(ShenyuClientRegisterService::rpcType, Function.identity()));// 启动Disruptor,添加元数据和URI的订阅器publisher.start(registerServiceMap);// 初始化注册中心registerRepository.init(publisher, shenyuRegisterCenterConfig);return registerRepository;}
}

该配置类创建了2个bean:

  • ShenyuRegisterCenterConfig:shenyu-admin注册中心配置,读取shenyu.register属性配置。
  • ShenyuClientServerRegisterRepository:服务端注册类,用于从注册中心中监听数据,然后将数据写入Disruptor队列中。

这里的创建Disruptor发布者,启动Disruptor等逻辑跟在客户端那边的一样,只是类是服务端这边的,就不再分析了。

2.1.1、用于监听的ShenyuClientServerRegisterRepository

上面生成的ShenyuClientServerRegisterRepository是用于实现服务端注册的接口,会根据注册中心的配置通过SPI方式创建注册类,每一个注册方式都对应一个实现类。
在这里插入图片描述
目前支持7种注册类型:

  • Http:ShenyuClientHttpRegistryController
  • Apollo:ApolloClientServerRegisterRepository
  • Zookeeper:ZookeeperClientServerRegisterRepository
  • Etcd:EtcdClientServerRegisterRepository
  • Nacos:NacosClientServerRegisterRepository
  • Consul:ConsulClientServerRegisterRepository
  • Polaris:PolarisClientServerRegisterRepository
    加载类型通过registerType指定,也就是我们在配置文件中指定的类型:
shenyu:register:registerType: http

服务端的注册类型必须跟客户端的注册类型一致,这样服务端才可以监听到注册信息。这里要指定的是http,所以这里创建的就是ShenyuClientHttpRegistryController,这个不就是前面说到的注册接口所在的类么,所以回到前面的ShenyuClientHttpRegistryController

2.2、注册元数据和URI

2.2.1、注册接口接收数据写入Disruptor

@RequestMapping("/shenyu-client")
@Join
public class ShenyuClientHttpRegistryController implements ShenyuClientServerRegisterRepository {/*** 注册元数据*/@PostMapping("/register-metadata")@ResponseBodypublic String registerMetadata(@RequestBody final MetaDataRegisterDTO metaDataRegisterDTO) {// 直接将元数据发布到Disruptorpublisher.publish(metaDataRegisterDTO);return ShenyuResultMessage.SUCCESS;}/*** 注册URI*/@PostMapping("/register-uri")@ResponseBodypublic String registerURI(@RequestBody final URIRegisterDTO uriRegisterDTO) {// 直接将URI发布到Disruptorpublisher.publish(uriRegisterDTO);return ShenyuResultMessage.SUCCESS;}
}

两个注册接口获取到数据后,就直接调用了publisher.publish()方法,把数据发布到Disruptor队列中。

2.2.2、Disruptor消费数据并持久化

QueueConsumer实现了WorkHandler接口,是Disruptor的消费者,消费逻辑就在它的onEvent方法中:

public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {private final OrderlyExecutor executor;private final QueueConsumerFactory<T> factory;/*** Instantiates a new Queue consumer.** @param executor the executor* @param factory  the factory*/public QueueConsumer(final OrderlyExecutor executor, final QueueConsumerFactory<T> factory) {this.executor = executor;this.factory = factory;}@Overridepublic void onEvent(final DataEvent<T> t) {if (Objects.nonNull(t)) {// 根据事件类型使用不同的线程池ThreadPoolExecutor executor = orderly(t);// 通过工厂创建队列消费任务 RegisterServerConsumerExecutorQueueConsumerExecutor<T> queueConsumerExecutor = factory.create();// 为消费任务设置数据queueConsumerExecutor.setData(t.getData());t.setData(null);// 放在线程池中执行 消费任务executor.execute(queueConsumerExecutor);}}// ...
}

分析客户端注册流程的时候说到RegisterServerConsumerExecutor是服务端消费者任务,处理数据入库操作和发布事件。
RegisterServerConsumerExecutor消费逻辑:

public final class RegisterServerConsumerExecutor extends QueueConsumerExecutor<Collection<DataTypeParent>> {// 每种数据类型的订阅器执行器private final Map<DataType, ExecutorSubscriber<DataTypeParent>> subscribers;private RegisterServerConsumerExecutor(final Map<DataType, ExecutorTypeSubscriber<DataTypeParent>> executorSubscriberMap) {this.subscribers = new HashMap<>(executorSubscriberMap);}@Overridepublic void run() {Collection<DataTypeParent> results = getData().stream().filter(this::isValidData).collect(Collectors.toList());if (CollectionUtils.isEmpty(results)) {return;}// 选择对应的数据类型的订阅器执行器去执行selectExecutor(results).executor(results);}private ExecutorSubscriber<DataTypeParent> selectExecutor(final Collection<DataTypeParent> list) {final Optional<DataTypeParent> first = list.stream().findFirst();return subscribers.get(first.orElseThrow(() -> new RuntimeException("the data type is not found")).getType());}// ...
}

根据不同的数据类型使用不同的订阅器执行器去执行,这些订阅器是在disruptor启动的时候设置的。
服务端的订阅器有3个,分别为MetadataExecutorSubscriberURIRegisterExecutorSubscriberApiDocExecutorSubscriber,分别处理元数据URIAPI文档

元数据的处理

public class MetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {// 每种客户端类型的注册服务private final Map<String, ShenyuClientRegisterService> shenyuClientRegisterService;public MetadataExecutorSubscriber(final Map<String, ShenyuClientRegisterService> shenyuClientRegisterService) {this.shenyuClientRegisterService = shenyuClientRegisterService;}@Overridepublic DataType getType() {return DataType.META_DATA;}@Overridepublic void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {// 遍历元数据metaDataRegisterDTOList.forEach(meta -> {// 根据客户端类型Optional.ofNullable(this.shenyuClientRegisterService.get(meta.getRpcType())).ifPresent(shenyuClientRegisterService -> {// 加锁,保证数据顺序执行,防止并发synchronized (shenyuClientRegisterService) {// 处理数据shenyuClientRegisterService.register(meta);}});});}
}

ShenyuClientRegisterService是注册方法接口,它有多个实现类:
在这里插入图片描述

  • AbstractContextPathRegisterService:抽象类,处理部分公共逻辑;
  • AbstractShenyuClientRegisterServiceImpl::抽象类,处理部分公共逻辑;
  • ShenyuClientRegisterDivideServiceImpl:divide类,处理http注册类型;
  • ShenyuClientRegisterDubboServiceImpl:dubbo类,处理dubbo注册类型;
  • ShenyuClientRegisterGrpcServiceImpl:gRPC类,处理gRPC注册类型;
  • ShenyuClientRegisterBrpcServiceImpl:bRPC类,处理bRPC注册类型;
  • ShenyuClientRegisterMotanServiceImpl:Motan类,处理Motan注册类型;
  • ShenyuClientRegisterSofaServiceImpl:Sofa类,处理Sofa注册类型;
  • ShenyuClientRegisterSpringCloudServiceImpl:SpringCloud类,处理SpringCloud注册类型;
  • ShenyuClientRegisterTarsServiceImpl:Tars类,处理Tars注册类型;
  • ShenyuClientRegisterWebSocketServiceImpl:Websocket类,处理Websocket注册类型;

每一种rpc类型都对应一个注册处理类,所以本文是使用ShenyuClientRegisterDivideServiceImpl来处理。

public abstract class AbstractShenyuClientRegisterServiceImpl extends FallbackShenyuClientRegisterService implements ShenyuClientRegisterService {@Resourceprivate ApplicationEventPublisher eventPublisher;// 这几个就是操作数据库的service@Resourceprivate SelectorService selectorService;@Resourceprivate MetaDataService metaDataService;@Resourceprivate RuleService ruleService;@Overridepublic String register(final MetaDataRegisterDTO dto) {// 1、注册选择器(可以认为一个服务就是一个选择器)// 选择器执行逻辑,默认情况是空的,需要在控制台另外手动配置// 子类实现String selectorHandler = selectorHandler(dto);// 持久化选择器并发布选择器变更事件(不存在的时候)ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATEString selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);// 2、注册规则(可以认为一个元数据就是一个规则,根据path判断是否同一个)// 规则处理逻辑// 子类实现,,都是直接创建一个各自rpc类型的默认逻辑String ruleHandler = ruleHandler();// 构建规则DTORuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);// 持久化规则并发布规则变更事件(不存在的时候)ConfigGroupEnum.RULE, DataEventTypeEnum.UPDATEruleService.registerDefault(ruleDTO);// 3、注册元数据,并发布元数据变更事件(已存在,发布元数据更新事件,不存在,发布元数据创建事件)// 子类实现registerMetadata(dto);// 4、注册contextPath(只有http,springCloud,webSocket类型才有)String contextPath = dto.getContextPath();if (StringUtils.isNotEmpty(contextPath)) {registerContextPath(dto);}return ShenyuResultMessage.SUCCESS;}}

整个注册处理逻辑可以分为4步:

  1. 注册选择器,构建选择器,默认情况下一个服务就是一个选择器。之后将选择器插入数据库并发布选择器变更事件。
@Override
public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) {// 以contextPath或appName作为选择器名称String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName());// 根据选择器名和插件名从数据库中查询选择器SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName);// 如果还不存在,就创建一个选择器插入数据库if (Objects.isNull(selectorDO)) {// 构建选择器DTOSelectorDTO selectorDTO = SelectorUtil.buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId());selectorDTO.setHandle(selectorHandler);// 注册选择器并发布事件 ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATEreturn registerDefault(selectorDTO);}return selectorDO.getId();
}
  1. 注册规则,可以认为一个元数据就是一个规则,根据path判断是否同一个。
    构建规则:
private RuleDTO buildRpcDefaultRuleDTO(final String selectorId, final MetaDataRegisterDTO metaDataDTO, final String ruleHandler) {return buildRuleDTO(selectorId, ruleHandler, metaDataDTO.getRuleName(), metaDataDTO.getPath());
}private RuleDTO buildRuleDTO(final String selectorId, final String ruleHandler, final String ruleName, final String path) {// 构建规则DTORuleDTO ruleDTO = RuleDTO.builder().selectorId(selectorId).name(ruleName).matchMode(MatchModeEnum.AND.getCode()).enabled(Boolean.TRUE).loged(Boolean.TRUE).matchRestful(Boolean.FALSE).sort(1).handle(ruleHandler).build();// 将{xxx}替换成**String conditionPath = this.rewritePath(path);RuleConditionDTO ruleConditionDTO = RuleConditionDTO.builder().paramType(ParamTypeEnum.URI.getName()).paramName("/").paramValue(conditionPath).build();// 设置规则条件if (conditionPath.endsWith(AdminConstants.URI_SLASH_SUFFIX)) {ruleConditionDTO.setOperator(OperatorEnum.STARTS_WITH.getAlias());} else if (conditionPath.endsWith(AdminConstants.URI_SUFFIX)) {ruleConditionDTO.setOperator(OperatorEnum.PATH_PATTERN.getAlias());} else if (conditionPath.indexOf("*") > 1) {ruleConditionDTO.setOperator(OperatorEnum.MATCH.getAlias());} else {ruleConditionDTO.setOperator(OperatorEnum.EQ.getAlias());}ruleDTO.setRuleConditions(Collections.singletonList(ruleConditionDTO));return ruleDTO;
}

保存规则:

@Override
public String registerDefault(final RuleDTO ruleDTO) {// 选择器下已经存在同名的规则,则直接返回,什么也不干if (Objects.nonNull(ruleMapper.findBySelectorIdAndName(ruleDTO.getSelectorId(), ruleDTO.getName()))) {return "";}RuleDO ruleDO = RuleDO.buildRuleDO(ruleDTO);if (StringUtils.isEmpty(ruleDTO.getId())) {// 插入规则ruleMapper.insertSelective(ruleDO);// 插入规则条件addCondition(ruleDO, ruleDTO.getRuleConditions());}// 发布规则变更事件 ConfigGroupEnum.RULE, DataEventTypeEnum.UPDATEruleEventPublisher.onRegister(ruleDO, ruleDTO.getRuleConditions());return ruleDO.getId();
}

具体的规则设计建议去看官方文档。
3. 注册元数据,直接将注册上来的元数据保存

@Override
protected void registerMetadata(final MetaDataRegisterDTO dto) {if (dto.isRegisterMetaData()) {MetaDataService metaDataService = getMetaDataService();// 根据路径查询元数据时候已存在MetaDataDO exist = metaDataService.findByPath(dto.getPath());// 已存在,就更新,发布元数据更新事件,不存在,就插入,发布元数据创建事件metaDataService.saveOrUpdateMetaData(exist, dto);}
}

4、注册ContextPath,只有httpspringCloudwebSocket类型才有。处理的逻辑在AbstractContextPathRegisterService中。

public abstract class AbstractContextPathRegisterService extends AbstractShenyuClientRegisterServiceImpl {@Overridepublic void registerContextPath(final MetaDataRegisterDTO dto) {// 持久化contextPath插件下的选择器并发布选择器变更事件String contextPathSelectorId = getSelectorService().registerDefault(dto, PluginEnum.CONTEXT_PATH.getName(), "");// 创建规则处理逻辑ContextMappingRuleHandle handle = new ContextMappingRuleHandle();handle.setContextPath(PathUtils.decoratorContextPath(dto.getContextPath()));handle.setAddPrefixed(dto.getAddPrefixed());// 注册contextPath插件默认的规则,contextPath就是规则名,并发布规则变更事件getRuleService().registerDefault(buildContextPathDefaultRuleDTO(contextPathSelectorId, dto, handle.toJson()));}
}

● URI的处理
URI数据是由URIRegisterExecutorSubscriber订阅器处理:

public class URIRegisterExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {@Overridepublic void executor(final Collection<URIRegisterDTO> dataList) {if (CollectionUtils.isEmpty(dataList)) {return;}// 根据rpc类型分类final Map<String, List<URIRegisterDTO>> groupByRpcType = dataList.stream().filter(data -> StringUtils.isNotBlank(data.getRpcType())).collect(Collectors.groupingBy(URIRegisterDTO::getRpcType));for (Map.Entry<String, List<URIRegisterDTO>> entry : groupByRpcType.entrySet()) {// 根据不同rpc类型使用对应的shenyuClientRegisterService处理final String rpcType = entry.getKey();Optional.ofNullable(shenyuClientRegisterService.get(rpcType)).ifPresent(service -> {final List<URIRegisterDTO> list = entry.getValue();// 再以contextPath/appName分类Map<String, List<URIRegisterDTO>> listMap = buildData(list);listMap.forEach((selectorName, uriList) -> {final List<URIRegisterDTO> register = new LinkedList<>();final List<URIRegisterDTO> offline = new LinkedList<>();for (URIRegisterDTO d : uriList) {final EventType eventType = d.getEventType();// 判断是注册类型还是下线类型if (Objects.isNull(eventType) || EventType.REGISTER.equals(eventType)) {// eventType is null, should be old versionsregister.add(d);} else if (EventType.OFFLINE.equals(eventType)) {offline.add(d);}}if (CollectionUtils.isNotEmpty(register)) {// 注册URIservice.registerURI(selectorName, register);}if (CollectionUtils.isNotEmpty(offline)) {// 下线URIservice.offline(selectorName, offline);}});});}}private Map<String, List<URIRegisterDTO>> buildData(final Collection<URIRegisterDTO> dataList) {Map<String, List<URIRegisterDTO>> resultMap = new HashMap<>(8);for (URIRegisterDTO dto : dataList) {String contextPath = dto.getContextPath();String key = StringUtils.isNotEmpty(contextPath) ? contextPath : dto.getAppName();if (StringUtils.isNotEmpty(key)) {if (resultMap.containsKey(key)) {List<URIRegisterDTO> existList = resultMap.get(key);existList.add(dto);resultMap.put(key, existList);} else {resultMap.put(key, Lists.newArrayList(dto));}}}return resultMap;}
}

调到FallbackShenyuClientRegisterServiceregisterURI()方法

@Override
public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) {String result;String key = key(selectorName);try {this.removeFallBack(key);result = this.doRegisterURI(selectorName, uriList);logger.info("Register success: {},{}", selectorName, uriList);} catch (Exception ex) {logger.warn("Register exception: cause:{}", ex.getMessage());result = "";this.addFallback(key, new FallbackHolder(selectorName, uriList));}return result;
}

FallbackShenyuClientRegisterService是用来异常处理的,然后调用doRegisterURI()做真正处理。

@Override
public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) {if (CollectionUtils.isEmpty(uriList)) {return "";}// 查询对应的选择器SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));if (Objects.isNull(selectorDO)) {throw new ShenyuException("doRegister Failed to execute,wait to retry.");}// 过滤port或host为空的URIList<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList());// 由URI构建处理选择器中的handler信息,更新选择器中的handler// 应该就是相当于添加上服务实例信息String handler = buildHandle(validUriList, selectorDO);if (handler != null) {selectorDO.setHandle(handler);SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));selectorData.setHandle(handler);// 更新数据库selectorService.updateSelective(selectorDO);// 发布选择器变更事件eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));}return ShenyuResultMessage.SUCCESS;
}

总结就是admin拿到URI数据后,更新选择器中的handler信息,然后写入到数据库,最后发布事件。
更新的就是这里的信息:
在这里插入图片描述
至此,服务端注册流程也就分析完了,主要通过对外提供的接口,接受客户端的注册信息,然后写入到Disruptor队列,再从中消费数据,根据接收到的元数据和URI数据更新admin的选择器、规则、元数据和选择器的handler。

参考资料:
官方博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/222116.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【系统架构】集群、分布式概念及系统架构演进过程

集群、分布式概念&#xff1a; 对食物没有太高要求的人在肚子饿的时候一般都会选择去兰州拉面、沙县小吃等小饭馆&#xff0c;这类小饭馆有个很显著的特点&#xff1a;洗菜、切菜、炒菜都是同一个人完成&#xff0c;如果厨子不舒服可能饭馆还会歇业。而一些人流量较大的饭馆的分…

Axure的动态面板

目录 动态面板 什么是Auxre动态模板 动态模板的步骤 应用场景 实战案例 轮播图 多功能登录界面 主界面左侧菜单栏 动态面板 什么是Auxre动态模板 动态面板是Axure中的一个重要功能&#xff0c;它允许用户创建可交互的页面&#xff0c;并模拟用户与页面的交互。通过添加元素…

智能守护,数据安全稳中求胜!上海迅软DSE助力家具家电行业引领潮流!

随着中国经济的蓬勃发展&#xff0c;家具家电企业正迎来“精品制造”的时代&#xff0c;业内竞争日益激烈。为了提升产品竞争力、扩大市场占有率&#xff0c;企业亟需加强对自主品牌的安全建设&#xff0c;确保品牌的自主知识产权、产品生产资料以及销售信息等核心数据不受泄漏…

文本处理工具doctran(集成LLM和NLP库)

今天给大家推荐一款文本处理工具&#xff0c;可以Extract、Redact、Summarize、Refine、Translate、Interrogate&#xff0c;这个工具就是文档转换框架doctran。 欢迎关注公众号 doctran基于OpenAI的GPT模型和开源的NLP库来剖析文本数据。该工具也可以在LangChain框架document…

【MODBUS】Modbus是什么?

Modbus协议&#xff0c;从字面理解它包括Mod和Bus两部分&#xff0c;首先它是一种bus&#xff0c;即总线协议&#xff0c;和12C、SP|类似&#xff0c;总线就意味着有主机&#xff0c;有从机&#xff0c;这些设备在同一条总线上。 Modbus支持单主机&#xff0c;多个从机&#xf…

Python (八)网络编程

程序员的公众号&#xff1a;源1024&#xff0c;获取更多资料&#xff0c;无加密无套路&#xff01; 最近整理了一份大厂面试资料《史上最全大厂面试题》&#xff0c;Springboot、微服务、算法、数据结构、Zookeeper、Mybatis、Dubbo、linux、Kafka、Elasticsearch、数据库等等 …

最新CRMEB商城源码开源版v5.2.2版本+前端uniapp

CRMEB开源商城系统是一款全开源可商用的系统&#xff0c;前后端分离开发&#xff0c;全部100%开源&#xff0c;在小程序、公众号、H5、APP、PC端都能用&#xff0c;使用方便&#xff0c;二开方便&#xff01;安装使用也很简单&#xff01;使用文档、接口文档、数据字典、二开文…

边缘检测@获取labelme标注的json黑白图掩码mask

import cv2 as cv import numpy as np import json import os from PIL import Imagedef convertPolygonToMask(jsonfilePath):

探秘闭包:隐藏在函数背后的小秘密(上)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

LeetCode1143.最长公共子序列

这道题看完就大概知道要用动态规划&#xff0c;然后想想如何建立动态转移方程&#xff0c;就很简单了&#xff0c;我都感觉我不是想出来的&#xff0c;是根据直觉应该是这样的然后边想边写就出来&#xff0c;以下是我的代码&#xff1a; class Solution {public int longestCom…

【Java】线程池的创建

目录 ​编辑 一、什么是线程池 二、创建和使用 导入必要的包&#xff1a; 创建线程池&#xff1a; 提交任务给线程池执行&#xff1a; 自定义Runnable和Callable任务&#xff1a; 关闭线程池&#xff1a; 我的其他博客 一、什么是线程池 在Java中&#xff0c;线程池是…

使用qrcode实现微信二维码转网址链接

很多情况下&#xff0c;后端会返回一个微信二维码的地址&#xff1a; 但是我们并无法直接打开或者是展示该二维码&#xff0c;毕竟浏览器不认识weixin://xxx &#xff0c;只认识http://xxx 如果想要实现weixin://xxx 转化为http:// 的形式&#xff0c;可以使用第三方库&#x…

jmeter如何循环运行到csv文件最后一行后停止

1、首先在线程组中设置’循环次数‘–勾选永远 2、csv数据文件设置中设置&#xff1a; 遇到文件结束符再次循环?——改为&#xff1a;False 遇到文件结束符停止线程?——改为&#xff1a;True 3、再次运行就会根据文档的行数运行数据 &#xff08;如果需要在循环控制器中&…

回答一个同学的问题:在目前深度学习爆火的年代,专家系统还有用吗,会被淘汰吗?

文章目录 我的看法如下&#xff1a;&#xff08;不会被淘汰&#xff0c;会逐渐进化&#xff09;总结 我的看法如下&#xff1a;&#xff08;不会被淘汰&#xff0c;会逐渐进化&#xff09; 专家系统和深度学习有其各自的优势。专家系统利用规则和知识库来给出结论,适用于问题范…

深度解析:探索「两数之和」问题的多种算法解决方案

今天要讨论的是「两数之和」问题&#xff0c;并将从哈希表解法到排序数组与双指针法、再到一遍哈希表解法的不同解决方案进行详细探讨 哈希表解法&#xff1a; 第一&#xff0c;使用了一种简单而有效的方法——哈希表。我们创建了一个 HashMap&#xff0c;用于存储已遍历过的元…

Android : SensorManager 传感器入门 简单应用

功能介绍&#xff1a;转动手机 图片跟着旋转 界面&#xff1a; activity_main.xml <?xml version"1.0" encoding"utf-8"?> <androidx.constraintlayout.widget.ConstraintLayout xmlns:android"http://schemas.android.com/apk/res/andr…

C4D云渲染怎么提升速度小技巧?C4D云渲染速度提升技巧

当许多C4D用户转向云渲染时&#xff0c;他们常常会发现渲染速度并没有预期中的提升&#xff0c;这让人产生疑问&#xff0c;为什么使用云渲染服务后&#xff0c;渲染时间依然没有显著缩短&#xff0c;C4D云渲染情况取决于多个因素&#xff0c;如&#xff1a;渲染任务特点以及所…

MYSQL练题笔记-子查询-换座位

一、题目相关内容 1&#xff09;相关的表和题目 2&#xff09;帮助理解题目的示例&#xff0c;提供返回结果的格式 二、自己初步的理解 没啥思路&#xff0c;我还没做过交换的这种题&#xff0c;所以我觉得这类交换的题以后值得做一个合集&#xff0c;是有点灵活度在里面的&a…

C++智能指针介绍

引言 为了充分利用RAII思想&#xff0c;C 11开始引入了智能指针&#xff0c;本文介绍RAII以及三种智能指针&#xff1a; std::unique_ptrstd::shared_ptrstd::weak_ptr 除此之外&#xff0c;本文还会介绍智能指针的常用创建方法&#xff1a; std::make_uniquestd::make_sha…

一键提取微信聊天记录,生成HTML、Word文档永久保存,还能生成微信年度聊天报告

不知道生活中你有没有遇到过这种情况&#xff0c;聊天记录不完整&#xff0c;有的在手机上&#xff0c;有的在电脑上&#xff0c;搜索起来很烦。那有没有一种办法可以把微信聊天记录统一呢&#xff1f;当然是有的。下面&#xff0c;就让我们一起来看一下怎么操作。 先看效果 操…