- 👏作者简介:大家好,我是爱吃芝士的土豆倪,24届校招生Java选手,很高兴认识大家
- 📕系列专栏:Spring源码、JUC源码、Kafka原理、分布式技术原理
- 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
- 🍂博主正在努力完成2023计划中:源码溯源,一探究竟
- 📝联系方式:nhs19990716,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬👀
文章目录
- Dubbo服务的注册流程
- 服务发布步骤
- 思考
- Dubbo源码分析
- Dubbo注解的解析流程
- DubboComponetScan
- ServiceAnnotationBeanPostProcessor
- 注册一个DubboBootstrapApplicationListener
- registerServiceBeans
- registerServiceBean
- buildServiceBeanDefinition
- ServiceBean的初始化阶段
- DubboBootstrapApplicationListener
- start()
- initialize()
- exportServices
- export
- exported
- doExport
- doExportUrls
- doExportUrlsFor1Protocol
- RegistryProtocol
- 服务发布流程
- doLocalExport
- protocol.export
- openServer
- Exchangers.bind
- headerExchanger.bind
- getTransporter
- NettyTransporter.bind
- NettyServer
- doOpen
- 服务注册流程
- RegistryProtocol
- getRegistry
- RegistryFactory$Adaptive
- RegistryFactoryWrapper
- RegistryProtocol.register
- ListenerRegistryWrapper.register
- ZookeeperRegistry
- ZookeeperRegistry.doRegister
- Invoker是什么?
- ProxyFacotory.getInvoker
- ProxyFactory
- ProxyFactory$Adaptive
- JavassistProxyFactory.getInvoker
- javassist生成的动态代理代码
Dubbo服务的注册流程
服务发布步骤
- 注解
@DubboService(loadbalance = "random",cluster = "failover",retries = 2)
- 注解扫描
@DubboComponentScan
思考
首先需要扫描注解,在扫描的过程中,可以拿到注解中的数据(注解的目的其实是做一个标记的功能,可以通过不同的标记去对一些类做一个分类,具体扫描哪一个注解取决于对其的关注度),然后解析注解获取对应的配置。
解析完成后,需要去发布服务,像Dubbo是基于URL驱动的,其会将所有的配置信息配置在URL的地址上,所以这一步主要做的就是URL的组装。
后面要做的事情就是将其注册到zookeeper上(相当于把服务端的相关配置信息和服务的地址信息都会保存到第三方的平台上)到了第三方平台之后,如果我的客户端要去调用的时候,可以通过第三方平台知道服务端的调用机制是什么,这些都可以在URL上识别到。
接下来就是启动服务了,根据URL中配置的协议、配置的端口去发布对应的服务。
Dubbo源码分析
Dubbo发布其实有两种形式,其实从前面介绍的博客上来说
分为 xml形式 dubbo:service 和 注解形式 @DubboService/ @Service
Dubbo注解的解析流程
DubboComponetScan
@DubboComponentScan(basePackages = "com.gupaoedu.springboot.dubbo.springbootdubbosampleprovider.services")@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(DubboComponentScanRegistrar.class)
public @interface DubboComponentScan {
}// 这里面无非就是注册一个bean到 Spring IOC 里面
public class DubboComponentScanRegistrar implements ImportBeanDefinitionRegistrar {@Overridepublic void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {// 这个就是获取我们在注解上 定义的 basePackagesSet<String> packagesToScan = getPackagesToScan(importingClassMetadata);registerServiceAnnotationBeanPostProcessor(packagesToScan, registry);// @since 2.7.6 Register the common beansregisterCommonBeans(registry);}
---------------------------------------------------------------------------
private void registerServiceAnnotationBeanPostProcessor(Set<String> packagesToScan, BeanDefinitionRegistry registry) {// 在这里注册了 一个叫做 ServiceAnnotationBeanPostProcessor 的beanBeanDefinitionBuilder builder = rootBeanDefinition(ServiceAnnotationBeanPostProcessor.class);// 传递一个构造参数 packagesToScan 这里也就意味着服务的注册流程和 ServiceAnnotationBeanPostProcessor bean有关系builder.addConstructorArgValue(packagesToScan);builder.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);AbstractBeanDefinition beanDefinition = builder.getBeanDefinition();// 将这个bean注册BeanDefinitionReaderUtils.registerWithGeneratedName(beanDefinition, registry);}
ServiceAnnotationBeanPostProcessor
public ServiceAnnotationBeanPostProcessor(Set<String> packagesToScan) {super(packagesToScan);
}// 进入super 因为实现了 BeanDefinitionRegistryPostProcessor,所以在bean装载完成之后,会触发postProcessBeanDefinitionRegistry 方法public class ServiceClassPostProcessor implements BeanDefinitionRegistryPostProcessor, EnvironmentAware,ResourceLoaderAware, BeanClassLoaderAware {public ServiceClassPostProcessor(Set<String> packagesToScan) {this.packagesToScan = packagesToScan;}@Overridepublic void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {// 注册一个基础的beanregisterInfrastructureBean(registry, DubboBootstrapApplicationListener.BEAN_NAME, DubboBootstrapApplicationListener.class);Set<String> resolvedPackagesToScan = resolvePackagesToScan(packagesToScan);// 判断我们传过来需要扫描的路径是不是空的,如果不是空的会调用下面的方法if (!CollectionUtils.isEmpty(resolvedPackagesToScan)) {registerServiceBeans(resolvedPackagesToScan, registry);} else {if (logger.isWarnEnabled()) {logger.warn("packagesToScan is empty , ServiceBean registry will be ignored!");}}}
注册一个DubboBootstrapApplicationListener
这个bean会在spring 容器的上下文装载完成之后,触发监听
public class DubboBootstrapApplicationListener extends OneTimeExecutionApplicationContextEventListenerimplements Ordered {/*** The bean name of {@link DubboBootstrapApplicationListener}** @since 2.7.6*/public static final String BEAN_NAME = "dubboBootstrapApplicationListener";private final DubboBootstrap dubboBootstrap;public DubboBootstrapApplicationListener() {this.dubboBootstrap = DubboBootstrap.getInstance();}@Overridepublic void onApplicationContextEvent(ApplicationContextEvent event) {// 上下文刷新的时候,也就是bean装载完成的时候if (event instanceof ContextRefreshedEvent) {onContextRefreshedEvent((ContextRefreshedEvent) event);} else if (event instanceof ContextClosedEvent) {onContextClosedEvent((ContextClosedEvent) event);}}
registerServiceBeans
private void registerServiceBeans(Set<String> packagesToScan, BeanDefinitionRegistry registry) {// 定义一个scanner 用作扫描DubboClassPathBeanDefinitionScanner scanner =new DubboClassPathBeanDefinitionScanner(registry, environment, resourceLoader);// 生成一个bean的名字BeanNameGenerator beanNameGenerator = resolveBeanNameGenerator(registry);scanner.setBeanNameGenerator(beanNameGenerator);// 为了兼容老的版本,实际上就是把需要扫描的注解类型,设置到Scanner。// refactor @since 2.7.7serviceAnnotationTypes.forEach(annotationType -> {scanner.addIncludeFilter(new AnnotationTypeFilter(annotationType));});// 遍历给的包路径for (String packageToScan : packagesToScan) {// 扫描对应的路径// Registers @Service Bean firstscanner.scan(packageToScan);// Finds all BeanDefinitionHolders of @Service whether @ComponentScan scans or not.// 查找@Service的所有beandefinitionholder// 相当于扫描所有加了 @DubboService注解的类Set<BeanDefinitionHolder> beanDefinitionHolders =findServiceBeanDefinitionHolders(scanner, packageToScan, registry, beanNameGenerator);if (!CollectionUtils.isEmpty(beanDefinitionHolders)) {for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) {registerServiceBean(beanDefinitionHolder, registry, scanner);}if (logger.isInfoEnabled()) {logger.info(beanDefinitionHolders.size() + " annotated Dubbo's @Service Components { " +beanDefinitionHolders +" } were scanned under package[" + packageToScan + "]");}} else {if (logger.isWarnEnabled()) {logger.warn("No Spring Bean annotating Dubbo's @Service was found under package["+ packageToScan + "]");}}}}
registerServiceBean
该bean和服务有关的信息,实际上都在我们刚刚定义的@DubboService
@DubboService(loadbalance = "random",cluster = "failover",retries = 2)
- 服务以什么协议发布
- 服务的负载均衡策略
- 服务的容错策略
- 服务发布端口
- …
private void registerServiceBean(BeanDefinitionHolder beanDefinitionHolder, BeanDefinitionRegistry registry,DubboClassPathBeanDefinitionScanner scanner) {// 获取beanDefinitionHolder中的类对象Class<?> beanClass = resolveClass(beanDefinitionHolder); // 在beanClass中查找@Service注解的存在Annotation service = findServiceAnnotation(beanClass);/*** The {@link AnnotationAttributes} of @Service annotation*///获取@Service注解的属性信息,包括interfaceClass等AnnotationAttributes serviceAnnotationAttributes = getAnnotationAttributes(service, false, false);// 根据@Service注解的属性信息,解析服务接口类Class<?> interfaceClass = resolveServiceInterfaceClass(serviceAnnotationAttributes, beanClass);// 使用@Service注解的属性信息,接口类和注解的服务Bean名称构建服务Bean定义String annotatedServiceBeanName = beanDefinitionHolder.getBeanName();// 根据@Service注解的属性信息和接口类生成服务Bean的名称AbstractBeanDefinition serviceBeanDefinition =buildServiceBeanDefinition(service, serviceAnnotationAttributes, interfaceClass, annotatedServiceBeanName);// ServiceBean Bean name// 使用扫描器检查候选的Bean名称和服务Bean定义是否重复String beanName = generateServiceBeanName(serviceAnnotationAttributes, interfaceClass);if (scanner.checkCandidate(beanName, serviceBeanDefinition)) { // check duplicated candidate bean// 如果候选Bean通过检查,将服务Bean定义注册到Bean定义注册表中// 通过 buildServiceBeanDefinition 得知这里面注册的就是ServiceBeanregistry.registerBeanDefinition(beanName, serviceBeanDefinition);// 据注册结果输出相应的日志信息,包括注册成功和重复注册的警告信息if (logger.isInfoEnabled()) {logger.info("The BeanDefinition[" + serviceBeanDefinition +"] of ServiceBean has been registered with name : " + beanName);}} else {if (logger.isWarnEnabled()) {logger.warn("The Duplicated BeanDefinition[" + serviceBeanDefinition +"] of ServiceBean[ bean name : " + beanName +"] was be found , Did @DubboComponentScan scan to same package in many times?");}}}
buildServiceBeanDefinition
private AbstractBeanDefinition buildServiceBeanDefinition(Annotation serviceAnnotation,AnnotationAttributes serviceAnnotationAttributes,Class<?> interfaceClass,String annotatedServiceBeanName) {// 通过大体阅读可以看到,其会将很多的配置信息构建到一个叫做 ServiceBean 里面 !!!BeanDefinitionBuilder builder = rootBeanDefinition(ServiceBean.class);AbstractBeanDefinition beanDefinition = builder.getBeanDefinition();MutablePropertyValues propertyValues = beanDefinition.getPropertyValues();String[] ignoreAttributeNames = of("provider", "monitor", "application", "module", "registry", "protocol","interface", "interfaceName", "parameters");propertyValues.addPropertyValues(new AnnotationPropertyValuesAdapter(serviceAnnotation, environment, ignoreAttributeNames));// References "ref" property to annotated-@Service BeanaddPropertyReference(builder, "ref", annotatedServiceBeanName);// Set interfacebuilder.addPropertyValue("interface", interfaceClass.getName());// Convert parameters into mapbuilder.addPropertyValue("parameters", convertParameters(serviceAnnotationAttributes.getStringArray("parameters")));// Add methods parametersList<MethodConfig> methodConfigs = convertMethodConfigs(serviceAnnotationAttributes.get("methods"));if (!methodConfigs.isEmpty()) {builder.addPropertyValue("methods", methodConfigs);}/*** Add {@link org.apache.dubbo.config.ProviderConfig} Bean reference*/String providerConfigBeanName = serviceAnnotationAttributes.getString("provider");if (StringUtils.hasText(providerConfigBeanName)) {addPropertyReference(builder, "provider", providerConfigBeanName);}/*** Add {@link org.apache.dubbo.config.MonitorConfig} Bean reference*/String monitorConfigBeanName = serviceAnnotationAttributes.getString("monitor");if (StringUtils.hasText(monitorConfigBeanName)) {addPropertyReference(builder, "monitor", monitorConfigBeanName);}/*** Add {@link org.apache.dubbo.config.ApplicationConfig} Bean reference*/String applicationConfigBeanName = serviceAnnotationAttributes.getString("application");if (StringUtils.hasText(applicationConfigBeanName)) {addPropertyReference(builder, "application", applicationConfigBeanName);}/*** Add {@link org.apache.dubbo.config.ModuleConfig} Bean reference*/String moduleConfigBeanName = serviceAnnotationAttributes.getString("module");if (StringUtils.hasText(moduleConfigBeanName)) {addPropertyReference(builder, "module", moduleConfigBeanName);}/*** Add {@link org.apache.dubbo.config.RegistryConfig} Bean reference*/String[] registryConfigBeanNames = serviceAnnotationAttributes.getStringArray("registry");List<RuntimeBeanReference> registryRuntimeBeanReferences = toRuntimeBeanReferences(registryConfigBeanNames);if (!registryRuntimeBeanReferences.isEmpty()) {builder.addPropertyValue("registries", registryRuntimeBeanReferences);}/*** Add {@link org.apache.dubbo.config.ProtocolConfig} Bean reference*/String[] protocolConfigBeanNames = serviceAnnotationAttributes.getStringArray("protocol");List<RuntimeBeanReference> protocolRuntimeBeanReferences = toRuntimeBeanReferences(protocolConfigBeanNames);if (!protocolRuntimeBeanReferences.isEmpty()) {builder.addPropertyValue("protocols", protocolRuntimeBeanReferences);}return builder.getBeanDefinition();}
最终通过上述代码,讲一个 dubbo中提供的ServiceBean注入到Spring IOC容器
ServiceBean的初始化阶段
因为我们向spring注入了一个ServiceBean 那么在spring最后实例化阶段,即执行到 finishBeanFactoryInitialization 方法的时候就会调用到getBean方法从而通过反射去实例化,那么就会调用到ServiceBean 的构造方法。看看其构造函数
public ServiceBean() {super();this.service = null;}
当ServiceBean初始化完成之后,会调用下面的方法.
@Override
public void afterPropertiesSet() throws Exception {if (StringUtils.isEmpty(getPath())) {if (StringUtils.isNotEmpty(beanName)&& StringUtils.isNotEmpty(getInterface())&& beanName.startsWith(getInterface())) {setPath(beanName);}}
}
DubboBootstrapApplicationListener
在Dubbo中,DubboBootstrapApplicationListener是一个Spring应用程序监听器,它在Spring应用程序启动时会监听Dubbo的启动事件。
当启动 Dubbo服务时。
public class DubboBootstrapApplicationListener extends OnceApplicationContextEventListener implements Ordered {/*** The bean name of {@link DubboBootstrapApplicationListener}** @since 2.7.6*/public static final String BEAN_NAME = "dubboBootstrapApplicationListener";private final DubboBootstrap dubboBootstrap;public DubboBootstrapApplicationListener(ApplicationContext applicationContext) {super(applicationContext);this.dubboBootstrap = DubboBootstrap.getInstance();}@Overridepublic void onApplicationContextEvent(ApplicationContextEvent event) {if (event instanceof ContextRefreshedEvent) {onContextRefreshedEvent((ContextRefreshedEvent) event);} else if (event instanceof ContextClosedEvent) {onContextClosedEvent((ContextClosedEvent) event);}}private void onContextRefreshedEvent(ContextRefreshedEvent event) {dubboBootstrap.start();}private void onContextClosedEvent(ContextClosedEvent event) {dubboBootstrap.stop();}@Overridepublic int getOrder() {return LOWEST_PRECEDENCE;}
}// 监听的时候会进入到 onContextRefreshedEvent 里面
当开启start的时候可能会做的配置:
- 元数据/远程配置信息的初始化
- 拼接url()
- 如果是dubbo协议,则启动netty server
- 服务注册
start()
public DubboBootstrap start() {// 首先通过compareAndSet方法确保started标识为false,避免重复执行启动操作if (started.compareAndSet(false, true)) {ready.set(false);// 调用initialize方法进行初始化。initialize();if (logger.isInfoEnabled()) {logger.info(NAME + " is starting...");}// 1. export Dubbo Services// 导出Dubbo服务,即将服务暴露出去。exportServices();// Not only provider register// 如果不仅仅是注册提供者,并且已经导出了服务,那么还会导出MetadataService。// 如果需要,注册本地ServiceInstance。if (!isOnlyRegisterProvider() || hasExportedServices()) {// 2. export MetadataServiceexportMetadataService();//3. Register the local ServiceInstance if requiredregisterServiceInstance();}referServices();// 如果存在异步导出的服务,会启动一个新的线程来等待异步导出完成。if (asyncExportingFutures.size() > 0) {new Thread(() -> {try {this.awaitFinish();} catch (Exception e) {logger.warn(NAME + " exportAsync occurred an exception.");}// 最后设置ready标识为true,表示Dubbo框架已经准备就绪。ready.set(true);if (logger.isInfoEnabled()) {logger.info(NAME + " is ready.");}}).start();} else {ready.set(true);if (logger.isInfoEnabled()) {logger.info(NAME + " is ready.");}}if (logger.isInfoEnabled()) {logger.info(NAME + " has started.");}}return this;}
initialize()
public void initialize() {if (!initialized.compareAndSet(false, true)) {return;}// 初始化ApplicationModel:初始化应用模型,其中会维护很多配置信息。ApplicationModel.initFrameworkExts();// 启动配置中心,用于管理和获取配置信息。startConfigCenter();// 加载远程配置信息。loadRemoteConfigs();// 检查全局配置信息。checkGlobalConfigs();// @since 2.7.8// 在2.7.8版本之后的Dubbo中启动元数据中心。startMetadataCenter();// 初始化元数据服务。initMetadataService();// 初始化事件监听器。initEventListener();// 打印初始化完成的日志信息。if (logger.isInfoEnabled()) {logger.info(NAME + " has been initialized!");}}
exportServices
发布Dubbo服务
private void exportServices() {// 这里面有我们要发布服务的列表configManager.getServices().forEach(sc -> {// TODO, compatible with ServiceConfig.export()ServiceConfig serviceConfig = (ServiceConfig) sc;serviceConfig.setBootstrap(this);// 是否异步发布还是同步发布if (exportAsync) {// 使用线程池来异步发布ExecutorService executor = executorRepository.getServiceExporterExecutor();Future<?> future = executor.submit(() -> {sc.export();exportedServices.add(sc);});asyncExportingFutures.add(future);} else {sc.export();exportedServices.add(sc);}});
}
遍历所有dubbo服务,进行服务发布.
<dubbo:service beanName="ServiceBean:com.gupaoedu.springboot.dubbo.springbootdubbosampleprovider.services.IDemoService" />
<dubbo:service beanName="ServiceBean:com.gupaoedu.springboot.dubbo.ISayHelloService" />dubbo://ip:port?com.gupaoedu.springboot.dubbo.springbootdubbosampleprovider.services.IDemoService&xxx&xxx
dubbo://ip:port?com.gupaoedu.springboot.dubbo.ISayHelloService&xxx&xxx
一个dubbo服务需要发布几次,取决于协议的配置数,如果一个dubbo服务配置了3个协议,rest、webservice、dubbo。
这个时候实际上就会生成三个地址:
dubbo://
rest://
webservice://
export
public synchronized void export() {// 首先检查 是否为空,如果为空则获取 DubboBootstrap 的实例,并进行初始化。if (bootstrap == null) {bootstrap = DubboBootstrap.getInstance();bootstrap.initialize();}// 调用 方法,用于检查和更新子配置。checkAndUpdateSubConfigs();//init serviceMetadata// 初始化 ,设置服务的版本、分组、接口类型、接口名称和目标引用。serviceMetadata.setVersion(getVersion());serviceMetadata.setGroup(getGroup());serviceMetadata.setDefaultGroup(getGroup());serviceMetadata.setServiceType(getInterfaceClass());serviceMetadata.setServiceInterfaceName(getInterface());serviceMetadata.setTarget(getRef());// 如果不应该导出服务,则直接返回。if (!shouldExport()) {return;}// 如果应该延迟导出服务,则使用 延迟执行 方法,延迟时间由 方法返回,时间单位为毫秒。DELAY_EXPORT_EXECUTORdoExportgetDelay()/*为什么延迟发布呢?之前老的版本里面,考虑spring配置的装载和 dubbo服务启动配置之间,会有一个先后的关系,如果说spring的一些配置还没有加载,但是dubbo服务已经启动了,这个时候就会导致一定的问题,有的时候就是为了延迟几秒钟等到spring环境加载好了,再去启动dubbo。也是为了保障服务启动的安全性。*/if (shouldDelay()) {DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);} else {// 如果不需要延迟导出,则直接调用 方法导出服务。doExport();}// 最后调用 方法,表示服务已经导出。exported()exported();}
exported
public void exported() {List<URL> exportedURLs = this.getExportedUrls();exportedURLs.forEach(url -> {Map<String, String> parameters = getApplication().getParameters();ServiceNameMapping.getExtension(parameters != null ? parameters.get(MAPPING_KEY) : null).map(url);});// dispatch a ServiceConfigExportedEvent since 2.7.4// 发布完成以后 会发布一个事件,服务配置启动成功的事件dispatch(new ServiceConfigExportedEvent(this));}
doExport
protected synchronized void doExport() {if (unexported) {throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");}if (exported) {return;}exported = true;if (StringUtils.isEmpty(path)) {path = interfaceName;}doExportUrls();}
doExportUrls
去发布这个url,也就是基于url的驱动去进行服务的发布,也就到了最关键的阶段了。
private void doExportUrls() {// 构建一个 ServiceRepository,将一些服务的描述信息都存储到这里,后续如果要去用的话,就从这里面拿到ServiceRepository repository = ApplicationModel.getServiceRepository();ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());repository.registerProvider(getUniqueServiceName(),ref,serviceDescriptor,this,serviceMetadata);// 在这里先去拿到注册中心的url,之前前面提到过,一个服务可以配置多个注册中心,也可以配置多个协议List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);// 遍历所有的协议,如果有多个协议的话,就采用不同的协议去发布for (ProtocolConfig protocolConfig : protocols) {String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);// In case user specified path, register service one more time to map it to path.repository.registerService(pathKey, interfaceClass);// TODO, uncomment this line once service key is unifiedserviceMetadata.setServiceKey(pathKey);// 这里将对应的协议 和 多个注册中心传递过去doExportUrlsFor1Protocol(protocolConfig, registryURLs);}
}
doExportUrlsFor1Protocol
到了这里,也就是最核心的操作
- 生成url
- 根据url中配置的协议类型,调用指定协议进行服务的发布
- 启动服务
- 注册服务
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {// 如果我们配置协议,默认采用Dubbo协议发布String name = protocolConfig.getName();if (StringUtils.isEmpty(name)) {name = DUBBO;}//用来存储所有的配置信息/* dubbo:servicedubbo:methoddubbo:argument*/Map<String, String> map = new HashMap<String, String>();map.put(SIDE_KEY, PROVIDER_SIDE);ServiceConfig.appendRuntimeParameters(map);AbstractConfig.appendParameters(map, getMetrics());AbstractConfig.appendParameters(map, getApplication());AbstractConfig.appendParameters(map, getModule());// remove 'default.' prefix for configs from ProviderConfig// appendParameters(map, provider, Constants.DEFAULT_KEY);AbstractConfig.appendParameters(map, provider);AbstractConfig.appendParameters(map, protocolConfig);AbstractConfig.appendParameters(map, this);MetadataReportConfig metadataReportConfig = getMetadataReportConfig();if (metadataReportConfig != null && metadataReportConfig.isValid()) {map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);}if (CollectionUtils.isNotEmpty(getMethods())) {// 去遍历解析所有的methodsfor (MethodConfig method : getMethods()) {AbstractConfig.appendParameters(map, method, method.getName());String retryKey = method.getName() + ".retry";if (map.containsKey(retryKey)) {String retryValue = map.remove(retryKey);if ("false".equals(retryValue)) {map.put(method.getName() + ".retries", "0");}}List<ArgumentConfig> arguments = method.getArguments();if (CollectionUtils.isNotEmpty(arguments)) {// 然后再去遍历解析里面的 argumentsfor (ArgumentConfig argument : arguments) {// convert argument typeif (argument.getType() != null && argument.getType().length() > 0) {Method[] methods = interfaceClass.getMethods();// visit all methodsif (methods.length > 0) {for (int i = 0; i < methods.length; i++) {String methodName = methods[i].getName();// target the method, and get its signatureif (methodName.equals(method.getName())) {Class<?>[] argtypes = methods[i].getParameterTypes();// one callback in the methodif (argument.getIndex() != -1) {if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());} else {throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());}} else {// multiple callbacks in the methodfor (int j = 0; j < argtypes.length; j++) {Class<?> argclazz = argtypes[j];if (argclazz.getName().equals(argument.getType())) {AbstractConfig.appendParameters(map, argument, method.getName() + "." + j);if (argument.getIndex() != -1 && argument.getIndex() != j) {throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());}}}}}}}} else if (argument.getIndex() != -1) {AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());} else {throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");}}}} // end of methods for}// 最终这个if结束完了之后,参数也就组装完成了。// 针对泛化添加的参数if (ProtocolUtils.isGeneric(generic)) {map.put(GENERIC_KEY, generic);map.put(METHODS_KEY, ANY_VALUE);} else {String revision = Version.getVersion(interfaceClass, version);if (revision != null && revision.length() > 0) {map.put(REVISION_KEY, revision);}String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();if (methods.length == 0) {logger.warn("No method found in service interface " + interfaceClass.getName());map.put(METHODS_KEY, ANY_VALUE);} else {map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));}}// 针对token添加的参数/*** Here the token value configured by the provider is used to assign the value to ServiceConfig#token*/if(ConfigUtils.isEmpty(token) && provider != null) {token = provider.getToken();}if (!ConfigUtils.isEmpty(token)) {if (ConfigUtils.isDefault(token)) {map.put(TOKEN_KEY, UUID.randomUUID().toString());} else {map.put(TOKEN_KEY, token);}}//init serviceMetadata attachmentsserviceMetadata.getAttachments().putAll(map);
在这之前,实际上都是参数的组装
这些数据就是service的元数据,也就是要将这些数据组装成元数据
// export service// 最终获取到 host的地址 以及 端口号String host = findConfigedHosts(protocolConfig, registryURLs, map);Integer port = findConfigedPorts(protocolConfig, name, map);// 然后就是组装url了URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
这也就是我们注册到服务中心的一个地址,但是还没有注册 和 发布
// You can customize Configurator to append extra parameters
// 这里就到了扩展点,如果有扩展的配置需要去装载的话,有的话,会根据扩展的配置去比对url,替换url里面的各种参数。// 当有了扩展点的知识后,现在看这里就清晰很多了
/*
首先,代码通过 ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) 获取 ConfiguratorFactory 的扩展加载器。然后,它检查是否有针对指定 URL 协议的 ConfiguratorFactory 扩展。这里的 url.getProtocol() 可能是获取 URL 的协议部分,比如 "http"、"https" 等。如果存在针对该协议的 ConfiguratorFactory 扩展,代码会调用它的 getConfigurator 方法来获取一个 Configurator 实例,并且使用该实例来对原始的 URL 进行配置,得到新的 URL。
*/f (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol()).getConfigurator(url).configure(url);}// 根据其范围来发布String scope = url.getParameter(SCOPE_KEY);// 如果scope != nullif (!SCOPE_NONE.equalsIgnoreCase(scope)) {// 如果其不等于远程,就发送一个本地if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {exportLocal(url);-----------------------------------------------------------------------------/*
这段代码也是一个 Java 代码片段,看起来是在使用 Dubbo 框架进行服务导出(export)的过程。让我来解释一下:首先,代码通过 URLBuilder.from(url) 创建了一个新的 URLBuilder 对象,并将传入的 url 作为初始 URL。接着,它使用 URLBuilder 的 setProtocol、setHost、setPort 方法分别设置了协议(LOCAL_PROTOCOL)、主机(LOCALHOST_VALUE)和端口(0)。然后,通过调用 build() 方法,将 URLBuilder 对象构建为一个新的 URL 对象,即 local。接下来,代码使用 PROXY_FACTORY.getInvoker 方法,根据 ref、interfaceClass 和 local 创建了一个 Invoker 对象。 PROXY_FACTORY 可能是一个代理工厂,用于创建服务的代理对象。然后,通过 PROTOCOL.export 方法将该 Invoker 导出为一个 Exporter 对象。 PROTOCOL 可能是 Dubbo 的协议实现,用于处理服务的导出和通信。最后,代码将导出的 Exporter 对象添加到 exporters 集合中,并打印日志信息,记录导出的服务接口类名和本地注册的 URL。简而言之,这段代码的作用是将一个服务根据指定的 URL 导出到本地注册中心,以供本地消费者调用。它会构建一个新的本地 URL,创建 Invoker 对象,并使用 Dubbo 的协议实现导出服务,最后记录日志并将 Exporter 对象添加到 exporters 集合中。*/private void exportLocal(URL url) {URL local = URLBuilder.from(url).setProtocol(LOCAL_PROTOCOL).setHost(LOCALHOST_VALUE).setPort(0).build();Exporter<?> exporter = PROTOCOL.export(PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));exporters.add(exporter);logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);}
-----------------------------------------------------------------------------}// 发布远程服务if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {if (CollectionUtils.isNotEmpty(registryURLs)) {// 判断发布的服务要注册到那几个注册中心// 循环遍历配置的注册中心的列表for (URL registryURL : registryURLs) {//if protocol is only injvm ,not registerif (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {continue;}// 添加参数url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);if (monitorUrl != null) {url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());}if (logger.isInfoEnabled()) {if (url.getParameter(REGISTER_KEY, true)) {logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);} else {logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);}}// For providers, this is used to enable custom proxy to generate invokerString proxy = url.getParameter(PROXY_KEY);if (StringUtils.isNotEmpty(proxy)) {// registry://ip:portregistryURL = registryURL.addParameter(PROXY_KEY, proxy);}
Invoker ,调用器. 服务提供者、服务的消费者。
// 生成一个InvokerInvoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);// 将这个发布出去Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);exporters.add(exporter);}} else {if (logger.isInfoEnabled()) {logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);}Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);exporters.add(exporter);}/*** @since 2.7.0* ServiceData Store*/WritableMetadataService metadataService = WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE));if (metadataService != null) {metadataService.publishServiceDefinition(url);}}}this.urls.add(url);
}
-------------------------------------------------------------------------------
Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
// 其得到的是一个自适应的扩展点,会动态生成Protocol$Adaptive ,然后会调用这个实例里面的export方法
// 理论上应该返回 extension = RegistryProtocol
// 实际返回的是这个QosProtocolWrapper(ProtocolFilterWrapper(ProtocolListenerWrapper(RegistryProtocol))
Wrapper包装
在ExtensionLoader.loadClass这个方法中,有一段这样的判断,如果当前这个类是一个wrapper包装类,也就是这个wrapper中有构造方法,参数是当前被加载的扩展点的类型,则把这个wrapper类加入到cacheWrapperClass缓存中。
else if (isWrapperClass(clazz)) {cacheWrapperClass(clazz);
}
private boolean isWrapperClass(Class<?> clazz) {try {clazz.getConstructor(type);return true;} catch (NoSuchMethodException e) {return false;}
}
//上面的判断是说,只要针对当前扩展点的类,如果存在一个构造方法,参数是当前需要加载的扩展点的对
象,那么就会进行包装
public ProtocolListenerWrapper(Protocol protocol) {if (protocol == null) {throw new IllegalArgumentException("protocol == null");}this.protocol = protocol;
}
我们可以在dubbo的配置文件中找到三个Wrapper org.apache.dubbo.rpc.Protocol 。
qos=org.apache.dubbo.qos.protocol.QosProtocolWrapper
filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
ProtocolListenerWrapper, 用于服务export时候插入监听机制
QosprotocolWrapper, 如果当前配置了注册中心,则会启动一个Qos server.qos是dubbo的在线运维命令,dubbo2.5.8新版本重构了telnet模块,提供了新的telnet命令支持,新版本的telnet端口与dubbo协议的端口是不同的端口,默认为22222
ProtocolFilterWrapper,对invoker进行filter的包装,实现请求的过滤
接着,在getExtension->createExtension方法中,会对cacheWrapperClass集合进行判断,如果集合不为空,则进行包装
Set<Class<?>> wrapperClasses = cachedWrapperClasses;if (CollectionUtils.isNotEmpty(wrapperClasses)) {for (Class<?> wrapperClass : wrapperClasses) {instance = injectExtension((T)wrapperClass.getConstructor(type).newInstance(instance));}}
这三个扩展点在注册场景中都不会生效,执行的逻辑中会先判断当前是否是注册协议,如果是则直接基
于协议去发布服务
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {return protocol.export(invoker);}return protocol.export(buildInvokerChain(invoker,Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol{public void destroy() {throw new UnsupportedOperationException("The method public abstractvoid org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");}public int getDefaultPort() {throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");}public Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");if (arg0.getUrl() == null) throw new
IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() ==
null");URL url = arg0.getUrl();String extName = ( url.getProtocol() == null ? "dubbo" :url.getProtocol() );if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");Protocol extension =
(org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dub
bo.rpc.Protocol.class).getExtension(extName);// 实际上调用的是动态生成的适配类中的export();return extension.export(arg0);
}public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0,
org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {if (arg1 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg1;String extName = ( url.getProtocol() == null ? "dubbo" :url.getProtocol() );if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");org.apache.dubbo.rpc.Protocol extension =(org.apache.dubbo.rpc.Protocol)
ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);return extension.refer(arg0, arg1);
}public java.util.List getServers() {throw new UnsupportedOperationException("The method public default java.util.List org.apache.dubbo.rpc.Protocol.getServers() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");}
}-------------------------------------------------------------------------------
RegistryProtocol
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {URL registryUrl = getRegistryUrl(originInvoker);// url to export locallyURL providerUrl = getProviderUrl(originInvoker);// Subscribe the override data// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call// the same service. Because the subscribed is cached key with the name of the service, it causes the// subscription information to cover.final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);//export invoker// 到这里才是真正意义上启动一个Netty Server,发布Dubbo协议的服务// dubbo还是基于url驱动,所有每次执行都会去改变urlfinal ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);// url to registryfinal Registry registry = getRegistry(originInvoker);final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);// decide if we need to delay publishboolean register = providerUrl.getParameter(REGISTER_KEY, true);if (register) {// 然后到这里才注册了服务register(registryUrl, registeredProviderUrl);}// register stated url on provider modelregisterStatedUrl(registryUrl, registeredProviderUrl, register);exporter.setRegisterUrl(registeredProviderUrl);exporter.setSubscribeUrl(overrideSubscribeUrl);// Deprecated! Subscribe to override rules in 2.6.x or before.registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);notifyExport(exporter);//Ensure that a new exporter instance is returned every time exportreturn new DestroyableExporter<>(exporter);}
服务发布流程
doLocalExport
服务启动过程
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {String key = getCacheKey(originInvoker);return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);});}
protocol.export
protocol是通过依赖注入来初始化的一个协议扩展点,并且我们可以看到这个protocol.export()方法上
增加了@Adaptive注解,表示它是一个动态适配的扩展点,意味着最终的执行链路应该是
ProtocolListenerWrapper ->QosProtocolWrapper ->ProtocolFilterWrapper-DubboProtocol
所以这里又回到了自适应扩展
如果 ProviderUrl: dubbo:// 是这样,那么就会选择 DubboProtocol
所以最终会去调用 DubboProtocol.export()
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {URL url = invoker.getUrl();// export service.String key = serviceKey(url);DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);// 把服务对应的invoker 存储,将来调用的时候,从map中拿到即可exporterMap.put(key, exporter);//export an stub service for dispatching eventBoolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);if (isStubSupportEvent && !isCallbackservice) {String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);if (stubServiceMethods == null || stubServiceMethods.length() == 0) {if (logger.isWarnEnabled()) {logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +"], has set stubproxy support event ,but no stub methods founded."));}}}// 开启一个服务openServer(url);// 优化序列化optimizeSerialization(url);return exporter;}
openServer
往下看这个过程,进入到openServer(),从名字来看它是用来开启一个服务。
去开启一个服务,并且放入到缓存中->在同一台机器上(单网卡),同一个端口上仅允许启动一个服务器实例。
private void openServer(URL url) {// 获取 host:port,并将其作为服务器实例的 key,用于标识当前的服务器实例String key = url.getAddress();//client 也可以暴露一个只有server可以调用的服务boolean isServer = url.getParameter(IS_SERVER_KEY, true);if (isServer) {//是否在serverMap中缓存了ProtocolServer server = serverMap.get(key);if (server == null) {synchronized (this) {server = serverMap.get(key);if (server == null) {// 创建服务器实例serverMap.put(key, createServer(url));}}} else {// 服务器已创建,则根据 url 中的配置重置服务器server.reset(url);}}}
createServer
创建服务.
在很多地方,这个地址一直伴随着dubbo的启动、消费、以及整个生命周期中。
private ProtocolServer createServer(URL url) {//组装url,在url中添加心跳时间、编解码参数url = URLBuilder.from(url)// 当服务关闭以后,发送一个只读的事件,默认是开启状态.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())// 启动心跳配置.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT)).addParameter(CODEC_KEY, DubboCodec.NAME).build();String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);//通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {throw new RpcException("Unsupported server type: " + str + ", url: " + url);}//创建ExchangeServer.ExchangeServer server;try {server = Exchangers.bind(url, requestHandler);} catch (RemotingException e) {throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);}str = url.getParameter(CLIENT_KEY);if (str != null && str.length() > 0) {Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();if (!supportedTypes.contains(str)) {throw new RpcException("Unsupported client type: " + str);}}return new DubboProtocolServer(server);}
Exchangers.bind
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}if (handler == null) {throw new IllegalArgumentException("handler == null");}//获取 Exchanger,默认为 HeaderExchanger。//调用 HeaderExchanger 的 bind 方法创建 ExchangeServer 实例url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");return getExchanger(url).bind(url, handler);}
headerExchanger.bind
这里面包含多个逻辑:
- new DecodeHandler(new HeaderExchangeHandler(handler))
- Transporters.bind
- new HeaderExchangeServer
这段代码是一个 Java 方法,其作用是将一个 ExchangeHandler 对象绑定到某个 URL 上,以创建一个 ExchangeServer 对象并返回。ExchangeServer 是一个 Dubbo 框架中的概念,表示一个 Dubbo 服务提供方所使用的网络通信服务器。
具体来说,这个方法接收两个参数:
- URL url:表示要绑定到的目标 URL。这个 URL 包含了一些必要的信息,比如主机名、端口号、协议类型等。
- ExchangeHandler handler:表示要绑定的 ExchangeHandler 对象,它是 Dubbo 框架中的核心组件之一,负责处理网络通信协议的编解码、消息的序列化和反序列化等工作。
在实现中,这个方法首先通过 Transporters.bind() 方法创建一个网络服务器,该方法会根据传入的 URL 中指定的协议类型(比如 TCP、HTTP 等),创建对应的网络服务器。然后,将这个新创建的服务器对象传递给一个 DecodeHandler 对象进行进一步的处理。DecodeHandler 是 Dubbo 框架中的一个组件,用于对网络数据进行解码和转换。最后,将这个 DecodeHandler 对象封装在一个 HeaderExchangeHandler 对象中,完成 Dubbo 协议头的添加和解析工作。HeaderExchangeHandler 是 Dubbo 框架中的一个核心组件,负责处理 Dubbo 协议头的生成和解析。
最终,这个方法会返回一个新的 HeaderExchangeServer 对象,该对象封装了之前创建的网络服务器以及协议头解析器等组件。这个 HeaderExchangeServer 对象可以被用来启动 Dubbo 服务提供方的网络通信服务。
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));}public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}if (handlers == null || handlers.length == 0) {throw new IllegalArgumentException("handlers == null");}ChannelHandler handler;if (handlers.length == 1) {handler = handlers[0];} else {handler = new ChannelHandlerDispatcher(handlers);}return getTransporter().bind(url, handler);}
getTransporter
getTransporter是一个自适应扩展点,它针对bind方法添加了自适应注解,意味着,bing方法的具体实现,会基于Transporter$Adaptive方法进行适配,那么在这里面默认的通信协议是netty,所以它会采用netty4的实现,也就是 org.apache.dubbo.remoting.transport.netty4.NettyTransporter。
public static Transporter getTransporter() { return
ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();}
NettyTransporter.bind
创建一个nettyserver
public Server bind(URL url, ChannelHandler listener) throws RemotingException {return new NettyServer(url, listener);
}
NettyServer
初始化一个nettyserver,并且从url中获得相应的ip/ port。然后调用 doOpen();
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));}public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {super(url, handler);localAddress = getUrl().toInetSocketAddress();// 获取 ip 和端口String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {bindIp = ANYHOST_VALUE;}bindAddress = new InetSocketAddress(bindIp, bindPort);this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);try {doOpen(); // 调用模板方法 doOpen 启动服务器if (logger.isInfoEnabled()) {logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());}} catch (Throwable t) {throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);}executor = executorRepository.createExecutorIfAbsent(url);}
doOpen
开启netty服务
总体来说,这段代码的主要功能是使用 Netty 框架创建一个服务器,并进行一系列的初始化设置,包括线程池的创建、网络通道的初始化、服务器选项的设置以及管道工厂的创建等。最终将服务器绑定到指定地址上,准备接受客户端的连接请求。
// doOpen() 方法是一个受保护的方法,用于在子类中被调用以进行服务器的打开操作。
protected void doOpen() throws Throwable {NettyHelper.setNettyLoggerFactory(); // 这行代码用于设置 Netty 框架的日志工厂,以便配置日志记录器。ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); // 创建一个用于处理接受连接的线程池。ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); // 创建一个用于处理网络 IO 事件的线程池。// 使用 NIO 方式创建 ServerSocketChannel 工厂,其中包含了 boss 线程池和 worker 线程池。ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));// 使用上一步创建的 ChannelFactory 初始化 ServerBootstrap 对象。bootstrap = new ServerBootstrap(channelFactory);// NettyHandler 负责处理 Netty 的网络事件,这里创建了一个 NettyHandler 对象,并获取其通道列表。final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);channels = nettyHandler.getChannels();/*设置服务器选项:设置 child.tcpNoDelay 为 true,表示禁用 Nagle 算法,即数据立即发送。
设置 backlog,指定了未完成连接队列的最大长度。*/bootstrap.setOption("child.tcpNoDelay", true);bootstrap.setOption("backlog", getUrl().getPositiveParameter(BACKLOG_KEY, Constants.DEFAULT_BACKLOG));bootstrap.setPipelineFactory(new ChannelPipelineFactory() {@Overridepublic ChannelPipeline getPipeline() {NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);ChannelPipeline pipeline = Channels.pipeline();pipeline.addLast("decoder", adapter.getDecoder());pipeline.addLast("encoder", adapter.getEncoder());pipeline.addLast("handler", nettyHandler);return pipeline;}});// bind// 使用 ServerBootstrap 绑定到指定的地址,并返回一个 ChannelFuture 对象。channel = bootstrap.bind(getBindAddress());}
然后需要要注意的是,它这里用到了一个handler来处理客户端传递过来的请求:
nettyServerHandler
NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
这个handler是一个链路,它的正确组成应该是
MultiMessageHandler(heartbeatHandler(AllChannelHandler(DecodeHandler(HeaderExchangeHeadler(DubboProtocol))))
后续接收到的请求,会一层一层的处理。比较繁琐
服务注册流程
RegistryProtocol
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {URL registryUrl = getRegistryUrl(originInvoker);// url to export locallyURL providerUrl = getProviderUrl(originInvoker);......// 到这里才是真正意义上启动一个Netty Server,发布Dubbo协议的服务// dubbo还是基于url驱动,所有每次执行都会去改变urlfinal ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);......if (register) {// 然后到这里才注册了服务register(registryUrl, registeredProviderUrl);}......}
了解了服务的发布之后,我们继续来看一下服务是如何发起注册的。
服务注册实际上就是把dubbo的协议url地址保存到第三方注册中心上。
private void register(URL registryUrl, URL registeredProviderUrl) {Registry registry = registryFactory.getRegistry(registryUrl);registry.register(registeredProviderUrl);}
getRegistry
- 把url转化为对应配置的注册中心的具体协议
- 根据具体协议,从registryFactory中获得指定的注册中心实现
那么这个registryFactory具体是怎么赋值的呢?
private Registry getRegistry(final Invoker<?> originInvoker) {//把url转化为配置的具体协议,比如zookeeper://ip:port. 这样后续获得的注册中心就会是基于zk的实现URL registryUrl = getRegistryUrl(originInvoker);return registryFactory.getRegistry(registryUrl);
}
在RegistryProtocol中存在一段这样的代码,很明显这是通过依赖注入来实现的扩展点。
private RegistryFactory registryFactory;
public void setRegistryFactory(RegistryFactory registryFactory) {this.registryFactory = registryFactory;
}
按照扩展点的加载规则,我们可以先看看/META-INF/dubbo/internal路径下找到RegistryFactory的配置文件.这个factory有多个扩展点的实现。
dubbo=org.apache.dubbo.registry.dubbo.DubboRegistryFactory
multicast=org.apache.dubbo.registry.multicast.MulticastRegistryFactory
zookeeper=org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory
redis=org.apache.dubbo.registry.redis.RedisRegistryFactory
consul=org.apache.dubbo.registry.consul.ConsulRegistryFactoryetcd3=org.apache.dubbo.registry.etcd.EtcdRegistryFactory
接着,找到RegistryFactory的实现, 发现它里面有一个自适应的方法,根据url中protocol传入的值进行适配
@SPI("dubbo")
public interface RegistryFactory {@Adaptive({"protocol"})Registry getRegistry(URL url);
RegistryFactory$Adaptive
由于在前面的代码中,url中的protocol已经改成了zookeeper,那么这个时候根据zookeeper获得的spi扩展点应该是RegistryFactoryWrapper
import org.apache.dubbo.common.extension.ExtensionLoader;public class RegistryFactory$Adaptive implements
org.apache.dubbo.registry.RegistryFactory {public org.apache.dubbo.registry.Registry getRegistry(org.apache.dubbo.common.URL arg0) {if (arg0 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg0;String extName = ( url.getProtocol() == null ? "dubbo" :
url.getProtocol() );if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.registry.RegistryFactory) name from url (" + url.toString() + ") use keys([protocol])");org.apache.dubbo.registry.RegistryFactory extension =
(org.apache.dubbo.registry.RegistryFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.registry.RegistryFactory.class).getExtension(extName);return extension.getRegistry(arg0);}
}
RegistryFactoryWrapper
而registryFactory.getRegistry(url)中,由于此时的registryFactory已经是ZookeeperRegistryFactory,所以这里会得到一个zookeeperRegistry。
public class RegistryFactoryWrapper implements RegistryFactory {private RegistryFactory registryFactory;public RegistryFactoryWrapper(RegistryFactory registryFactory) {this.registryFactory = registryFactory;}@Overridepublic Registry getRegistry(URL url) {return new ListenerRegistryWrapper(registryFactory.getRegistry(url),Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(RegistryServiceListener.class).getActivateExtension(url, "registry.listeners")));}
}
因此最终返回的Registry=ListenerRegistryWrapper。
下面这段代码的含义是:
- 获得注册的服务提供者地址
- 调用register发起注册
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);// decide if we need to delay publish
boolean register = providerUrl.getParameter(REGISTER_KEY, true);if (register) {register(registryUrl, registeredProviderUrl);}
RegistryProtocol.register
发起注册流程,registry对象的实例是=ListenerRegistryWrapper。所以调用这个对象的register方法。
private void register(URL registryUrl, URL registeredProviderUrl) {Registry registry = registryFactory.getRegistry(registryUrl);registry.register(registeredProviderUrl);
}
ListenerRegistryWrapper.register
这里做包装的目的,其实应该就是增加了一个监听器的处理过程。
@Overridepublic void register(URL url) {try {registry.register(url);} finally {if (CollectionUtils.isNotEmpty(listeners)) {RuntimeException exception = null;for (RegistryServiceListener listener : listeners) {if (listener != null) {try {listener.onRegister(url);} catch (RuntimeException t) {logger.error(t.getMessage(), t);exception = t;}}}if (exception != null) {throw exception;}}}}
ZookeeperRegistry
这个方法中并没有register方法,而ZookeeperRegsitry继承了FailbackRegistry,所以直接进入到FailbackRegistry这个类
- FailbackRegistry,从名字上来看,是一个失败重试机制
- 调用父类的register方法,讲当前url添加到缓存集合中
public void register(URL url) {if (!acceptable(url)) {logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");return;}super.register(url);removeFailedRegistered(url);removeFailedUnregistered(url);try {// Sending a registration request to the server sidedoRegister(url);} catch (Exception e) {Throwable t = e;// If the startup detection is opened, the Exception is thrown directly.boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)&& url.getParameter(Constants.CHECK_KEY, true)&& !CONSUMER_PROTOCOL.equals(url.getProtocol());boolean skipFailback = t instanceof SkipFailbackWrapperException;if (check || skipFailback) {if (skipFailback) {t = t.getCause();}throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);} else {logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);}// Record a failed registration request to a failed list, retry regularlyaddFailedRegistered(url);}}
ZookeeperRegistry.doRegister
通过curator客户端,把服务地址写入到注册中心。
@Override
public void doRegister(URL url) {try {zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));} catch (Throwable e) {throw new RpcException("Failed to register " + url + " to zookeeper " +getUrl() + ", cause: " + e.getMessage(), e);}
}
Invoker是什么?
Invoker翻译成中文是调用器,它在Dubbo中其实是一个比较重要的领域对象,最核心的是在服务的发布和调用中,都是以Invoker的形态存在。
在刚刚的服务发布过程中,整体分为三个阶段
- 第一个阶段会创造一个invoker
- 第二个阶段会把经历过一系列处理的invoker(各种包装),在DubboProtocol中保存到exporterMap中
- 第三个阶段把dubbo协议的url地址注册到注册中心上
而Invoker的作用就是收到客户端请求的时候,根据接口的全路径作为key,找到实例方法,然后通过反射去调用。
前面没有分析Invoker,我们来简单看看Invoker到底是一个啥东西。
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass,
registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
ProxyFacotory.getInvoker
这个是一个代理工程,用来生成invoker,从它的定义来看,它是一个自适应扩展点,看到这样的扩展点,我们几乎可以不假思索的想到它会存在一个动态适配器类
ProxyFactory proxyFactory =
ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
ProxyFactory
这个方法的简单解读为: 它是一个spi扩展点,并且默认的扩展实现是javassit, 这个接口中有三个方法,并且都是加了@Adaptive的自适应扩展点。所以如果调用getInvoker方法,应该会返回一个ProxyFactory$Adaptive
@SPI("javassist")
public interface ProxyFactory {@Adaptive({Constants.PROXY_KEY})<T> T getProxy(Invoker<T> invoker) throws RpcException;@Adaptive({Constants.PROXY_KEY})<T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;@Adaptive({Constants.PROXY_KEY})<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
ProxyFactory$Adaptive
这个自适应扩展点,做了两件事情
- 通过ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(extName)获取了一个指定名称的扩展点。
- 在dubbo-rpc-api/resources/META-INF/com.alibaba.dubbo.rpc.ProxyFactory中,定义了javassis=JavassisProxyFactory
- 调用JavassisProxyFactory的getInvoker方法
public class ProxyFactory$Adaptive implements org.apache.dubbo.rpc.ProxyFactory
{public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0) throws
org.apache.dubbo.rpc.RpcException {if (arg0 == null) throw new
IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");if (arg0.getUrl() == null) throw new
IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() ==null");org.apache.dubbo.common.URL url = arg0.getUrl();String extName = url.getParameter("proxy", "javassist");if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString()+ ") use keys([proxy])");org.apache.dubbo.rpc.ProxyFactory extension =
(org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache
.dubbo.rpc.ProxyFactory.class).getExtension(extName);return extension.getProxy(arg0);}public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0, boolean
arg1) throws org.apache.dubbo.rpc.RpcException {if (arg0 == null) throw new
IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");if (arg0.getUrl() == null) throw new
IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() ==null");org.apache.dubbo.common.URL url = arg0.getUrl();String extName = url.getParameter("proxy", "javassist");if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString()+ ") use keys([proxy])");org.apache.dubbo.rpc.ProxyFactory extension =
(org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache
.dubbo.rpc.ProxyFactory.class).getExtension(extName);return extension.getProxy(arg0, arg1);
}public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0,
java.lang.Class arg1, org.apache.dubbo.common.URL arg2) throws
org.apache.dubbo.rpc.RpcException {if (arg2 == null) throw new IllegalArgumentException("url == null");org.apache.dubbo.common.URL url = arg2;String extName = url.getParameter("proxy", "javassist");if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString()+ ") use keys([proxy])");org.apache.dubbo.rpc.ProxyFactory extension =
(org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache
.dubbo.rpc.ProxyFactory.class).getExtension(extName);return extension.getInvoker(arg0, arg1, arg2);}
}
JavassistProxyFactory.getInvoker
javassist是一个动态类库,用来实现动态代理的。
proxy:接口的实现: com.gupaoedu.practice.dubbo.SayHelloServiceImpl
type:接口全称 com.gupaoedu.dubbo.ISayHelloService
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);return new AbstractProxyInvoker<T>(proxy, type, url) {@Overrideprotected Object doInvoke(T proxy, String methodName,Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {return wrapper.invokeMethod(proxy, methodName, parameterTypes,arguments);}};
}
javassist生成的动态代理代码
通过断点的方式,在Wrapper.getWrapper中的makeWrapper,会创建一个动态代理,核心的方法invokeMethod代码如下
public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws
java.lang.reflect.InvocationTargetException {com.gupaoedu.dubbo.practice.ISayHelloService w;try {w = ((com.gupaoedu.dubbo.practice.ISayHelloService) $1);} catch (Throwable e) {throw new IllegalArgumentException(e);}try {if ("sayHello".equals($2) && $3.length == 1) {return ($w) w.sayHello((java.lang.String) $4[0]);}} catch (Throwable e) {throw new java.lang.reflect.InvocationTargetException(e);}throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not
found method \"" + $2 + "\" in class com.gupaoedu.dubbo.practice.ISayHelloService.");
}
构建好了代理类之后,返回一个AbstractproxyInvoker,并且它实现了doInvoke方法,这个地方似乎看到了dubbo消费者调用过来的时候触发的影子,因为wrapper.invokeMethod本质上就是触发上面动态代理类的方法invokeMethod.
return new AbstractProxyInvoker<T>(proxy, type, url) {@Overrideprotected Object doInvoke(T proxy, String methodName,Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {return wrapper.invokeMethod(proxy, methodName, parameterTypes,arguments);}
};
所以,简单总结一下Invoke本质上应该是一个代理,经过层层包装最终进行了发布。当消费者发起请求的时候,会获得这个invoker进行调用。
最终发布出去的invoker, 也不是单纯的一个代理,也是经过多层包装
InvokerDelegate(DelegateProviderMetaDataInvoker(AbstractProxyInvoker()))