基于Dubbo 3.1,详细介绍了Dubbo服务的发布与引用的源码。
此前我们学习了Dubbo的服务导出的源码,在DubboBootstrapApplicationListener#startSync方法中,在调用了exportServices方法进行服务导出之后,立即调用了referServices方法进行服务引用,所以说Dubbo3.1中,服务导出和服务引用的入口是相同的,都是DubboDeployApplicationListener这个监听器。
在spring容器启动的最后一个步也就是refresh方法内部最后的finishRefresh方法中,将会向所有监听器发布一个ContextRefreshedEvent事件,表示容器刷新完毕。DubboDeployApplicationListener会监听该事件,进而触发服务的导出和引用。
Dubbo 3.x服务引用源码:
- Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
- Dubbo 3.x源码(18)—Dubbo服务引用源码(1)
Dubbo 3.x服务发布源码:
- Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
- Dubbo 3.x源码(12)—Dubbo服务发布导出源码(1)
- Dubbo 3.x源码(13)—Dubbo服务发布导出源码(2)
- Dubbo 3.x源码(14)—Dubbo服务发布导出源码(3)
- Dubbo 3.x源码(15)—Dubbo服务发布导出源码(4)
- Dubbo 3.x源码(16)—Dubbo服务发布导出源码(5)
- Dubbo 3.x源码(17)—Dubbo服务发布导出源码(6)
文章目录
- 1 referServices引用服务入口
- 2 SimpleReferenceCache#get获取缓存
- 3 ReferenceConfig#get服务引用
- 4 ReferenceConfig#init初始化服务引用
- 4.1 appendConfig获取服务引用参数
- 5 总结
1 referServices引用服务入口
该方法获取全部ReferenceConfigBase实例并遍历,判断是否应该初始化,判断init属性,默认true,继续判断是否异步引用,默认同步。最后通过引用缓存对象来进行服务引用,即referenceCache.get(rc)方法实现。
这个init属性在Dubbo2.x版本引入,该值用于判断是否在afterPropertiesSet()时饥饿初始化引用,否则等到有人注入或引用该实例时再初始化,默认false,即懒加载。但是在Dubbo3.1版本中,init仅被用于在shouldInit方法中,而且默认返回true,不再是默认懒加载,只有手动设置为false,才不会引入服务。因为此时dubbo服务的引入已不在ReferenceBean的 afterPropertiesSet方法中。
/*** DefaultModuleDeployer的方法* <p>* 服务引用*/
private void referServices() {//获取全部ReferenceConfigBase实例并遍历configManager.getReferences().forEach(rc -> {try {ReferenceConfig<?> referenceConfig = (ReferenceConfig<?>) rc;//如果还没刷新该引用配置if (!referenceConfig.isRefreshed()) {//刷新配置,即Dubbo配置的重写(基于优先级的覆盖)//设个方法我们在此前Dubbo配置的加载部分已经讲过了referenceConfig.refresh();}//是否应该初始化,判断init属性,默认trueif (rc.shouldInit()) {//如果模块消费端开启异步调用,或者消费者开启异步调用,默认都是false,即同步调用if (referAsync || rc.shouldReferAsync()) {ExecutorService executor = executorRepository.getServiceReferExecutor();CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {try {referenceCache.get(rc);} catch (Throwable t) {logger.error("5-9", "", "", "Failed to async export service config: " + getIdentifier() + " , catch error : " + t.getMessage(), t);}}, executor);asyncReferringFutures.add(future);}//通常的逻辑,通过引用缓存对象来进行服务引用else {referenceCache.get(rc);}}} catch (Throwable t) {logger.error("5-15", "", "", "Model reference failed: " + getIdentifier() + " , catch error : " + t.getMessage(), t);referenceCache.destroy(rc);throw t;}});
}
2 SimpleReferenceCache#get获取缓存
SimpleReferenceCache是一个对于单例的ReferenceConfigBase的缓存对象。该方法中的大概逻辑为:
- 首先构建缓存key,规则为 {group}/{InterfaceName}:{version} ,随后获取引用的服务接口Class。
- 判断是否是单例服务,如果是那么首先尝试从缓存中获取已创建的服务代理实例对象proxy。
- 如果没有缓存,那么将ReferenceConfigBase存入referenceTypeMap和referenceKeyMap这两个缓存map中,随后调用ReferenceConfigBase自身的get方法进行服务引用,获取服务代理实例对象。
可以看到,虽然我们是调用的SimpleReferenceCache#get方法,但是其内部会判断如果没有进行服务引用,那么会引用服务,间接的达成了我们服务引用的目的,同时还构建了缓存。而服务引用的关键方法就是ReferenceConfigBase#get方法。
/*** SimpleReferenceCache的方法** @param rc 服务消费者引用服务配置* @return 服务代理实例对象*/
@Override
@SuppressWarnings("unchecked")
public <T> T get(ReferenceConfigBase<T> rc) {//获取缓存key,规则为 {group}/{InterfaceName}:{version} String key = generator.generateKey(rc);//获取引用的服务接口ClassClass<?> type = rc.getInterfaceClass();//是否是单例,默认trueboolean singleton = rc.getSingleton() == null || rc.getSingleton();T proxy = null;// Check existing proxy of the same 'key' and 'type' first.//如果是单例if (singleton) {//那么首先尝试从缓存中获取已创建的服务代理实例对象proxy = get(key, (Class<T>) type);} else {logger.warn("Using non-singleton ReferenceConfig and ReferenceCache at the same time may cause memory leak. " +"Call ReferenceConfig#get() directly for non-singleton ReferenceConfig instead of using ReferenceCache#get(ReferenceConfig)");}//如果缓存没有服务代理实例对象if (proxy == null) {//引用服务类型到服务消费者引用服务配置的缓存referenceTypeMapList<ReferenceConfigBase<?>> referencesOfType = referenceTypeMap.computeIfAbsent(type, _t -> Collections.synchronizedList(new ArrayList<>()));referencesOfType.add(rc);//引用缓存key到服务消费者引用服务配置的缓存referenceKeyMapList<ReferenceConfigBase<?>> referenceConfigList = referenceKeyMap.computeIfAbsent(key, _k -> Collections.synchronizedList(new ArrayList<>()));referenceConfigList.add(rc);/** 调用ReferenceConfigBase自身的get方法进行服务引用,获取服务代理实例对象*/proxy = rc.get();}//返回服务代理实例对象,不需要特意存入某个换尺寸,因为每个ReferenceConfigBase自身会存储它的取服务代理实例对象return proxy;
}
3 ReferenceConfig#get服务引用
该方法获取接口代理引用对象实例,这里面使用双重检测锁来判断ref是否为null,如果为null,说明该服务还没有进行服务引用,那么调用init方法初始化引用服务。
/*** ReferenceConfig的方法** @return 代理引用服务实例*/
@Override
public T get() {//销毁检查if (destroyed) {throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");}//如果接口代理引用为nullif (ref == null) {// ensure start module, compatible with old api usage//确保启动模块,与旧的API使用兼容getScopeModel().getDeployer().start();//双重校验锁synchronized (this) {//如果接口代理引用为nullif (ref == null) {//初始化引用服务init();}}}return ref;
}
4 ReferenceConfig#init初始化服务引用
该方法用于初始化某个服务引用,大概逻辑为:
- 服务引用前初始化serviceMetadata服务元数据。
- 注册serviceDescriptor服务描述符到本地内存的服务仓库ModuleServiceRepository的services缓存中,即注册service。
- 构建此服务对应的服务消费者模型consumerModel,并且注册到本地内存的服务仓库ModuleServiceRepository的consumers缓存中,即注册consumer。
- 调用createProxy方法,根据服务引用参数map创建服务接口代理引用对象,并赋值给res。这是核心逻辑。
- 设置服务元数据的服务接口代理引用对象。
/*** ReferenceConfig的方法* <p>* 初始化引用服务*/
protected synchronized void init() {//如果已被初始化,那么直接返回if (initialized && ref != null) {return;}try {//如果还没刷新该引用配置if (!this.isRefreshed()) {//刷新配置,即Dubbo配置的重写(基于优先级的覆盖)//设个方法我们在此前Dubbo配置的加载部分已经讲过了this.refresh();}/** 1 服务引用前初始化serviceMetadata服务元数据*/// init serviceMetadata//根据consumer初始化服务元数据initServiceMetadata(consumer);//设置服务类型serviceMetadata.setServiceType(getServiceInterfaceClass());// TODO, uncomment this line once service key is unified//服务key: group/服务接口:版本号serviceMetadata.generateServiceKey();//附加服务引用所需的所有配置Map<String, String> referenceParameters = appendConfig();// init service-application mapping 从本地存储和url参数初始化服务应用程序映射//也就是MetadataServiceNameMappinginitServiceAppsMapping(referenceParameters);//获取Module级别的服务存储仓库,其内部保存着服务提供者和服务消费者的缓存ModuleServiceRepository repository = getScopeModel().getServiceRepository();/** 2 注册serviceDescriptor服务描述符到本地内存的服务仓库ModuleServiceRepository的services缓存中,即注册service*///服务描述符,通过它可以获取服务描述信息,例如服务提供的方法,服务接口名,服务接口Class等ServiceDescriptor serviceDescriptor;if (CommonConstants.NATIVE_STUB.equals(getProxy())) {serviceDescriptor = StubSuppliers.getServiceDescriptor(interfaceName);repository.registerService(serviceDescriptor);} else {//注册服务描述符到服务仓库内部的services集合中serviceDescriptor = repository.registerService(interfaceClass);}/** 3 构建此服务对应的服务消费者模型consumerModel,并且注册到本地内存的服务仓库ModuleServiceRepository的consumers缓存中,即注册consumer*/consumerModel = new ConsumerModel(serviceMetadata.getServiceKey(),//生成动态代理的策略,可以选择两种策略:jdk和javassistproxy,//服务描述符serviceDescriptor,//域模型getScopeModel(),//服务元数据serviceMetadata,//转换和聚合异步方法信息createAsyncMethodInfo(),//服务接口类加载器interfaceClassLoader);// Compatible with dependencies on ServiceModel#getReferenceConfig() , and will be removed in a future version.//兼容ServiceModel#getReferenceConfig()上的依赖项,并将在未来的版本中删除。consumerModel.setConfig(this);//注册服务消费者模型到服务仓库内部的consumers集合中repository.registerConsumer(consumerModel);//将引用参数存入服务元数据的附加数据中serviceMetadata.getAttachments().putAll(referenceParameters);/** 4 根据服务引用参数创建服务接口代理引用对象*/ref = createProxy(referenceParameters);//设置服务元数据的服务接口代理引用对象serviceMetadata.setTarget(ref);serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);consumerModel.setDestroyRunner(getDestroyRunner());consumerModel.setProxyObject(ref);consumerModel.initMethodModels();//检查调用程序可用checkInvokerAvailable();} catch (Throwable t) {try {//销毁invokerif (invoker != null) {invoker.destroy();}} catch (Throwable destroy) {logger.warn("5-3", "", "", "Unexpected error occurred when destroy invoker of ReferenceConfig(" + url + ").", t);}//取消注册consumerModelif (consumerModel != null) {ModuleServiceRepository repository = getScopeModel().getServiceRepository();repository.unregisterConsumer(consumerModel);}//还原属性initialized = false;invoker = null;ref = null;consumerModel = null;serviceMetadata.setTarget(null);serviceMetadata.getAttributeMap().remove(PROXY_CLASS_REF);// Thrown by checkInvokerAvailable().if (t.getClass() == IllegalStateException.class &&t.getMessage().contains("No provider available for the service")) {// 2-2 - No provider available.logger.error("2-2", "server crashed", "", "No provider available.", t);}throw t;}initialized = true;
}
4.1 appendConfig获取服务引用参数
该方法用于获取当前的服务引用参数,用于后续的createProxy方法创建服务代理引用对象。
/*** ReferenceConfig的方法* <p>* 附加服务引用所需的所有配置到一个map中** @return 引用参数*/
private Map<String, String> appendConfig() {Map<String, String> map = new HashMap<>(16);//interface -> 服务接口全路径名map.put(INTERFACE_KEY, interfaceName);//side -> consumer 表示消费端map.put(SIDE_KEY, CONSUMER_SIDE);//添加运行时参数//dubbo -> Dubbo RPC协议版本,默认2.0.2//release -> Dubbo的实现版本,通常是jar版本//timestamp -> 当前时间戳毫秒值//pid -> 当前服务进程pidReferenceConfigBase.appendRuntimeParameters(map);//非泛化接口if (!ProtocolUtils.isGeneric(generic)) {String revision = Version.getVersion(interfaceClass, version);if (StringUtils.isNotEmpty(revision)) {map.put(REVISION_KEY, revision);}String[] methods = methods(interfaceClass);if (methods.length == 0) {logger.warn("5-4", "", "", "No method found in service interface: " + interfaceClass.getName());map.put(METHODS_KEY, ANY_VALUE);} else {map.put(METHODS_KEY, StringUtils.join(new HashSet<>(Arrays.asList(methods)), COMMA_SEPARATOR));}}//Application配置AbstractConfig.appendParameters(map, getApplication());//Module配置AbstractConfig.appendParameters(map, getModule());//consumer配置AbstractConfig.appendParameters(map, consumer);//reference配置AbstractConfig.appendParameters(map, this);appendMetricsCompatible(map);String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);if (StringUtils.isEmpty(hostToRegistry)) {hostToRegistry = NetUtils.getLocalHost();} else if (isInvalidLocalHost(hostToRegistry)) {throw new IllegalArgumentException("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);}map.put(REGISTER_IP_KEY, hostToRegistry);if (CollectionUtils.isNotEmpty(getMethods())) {for (MethodConfig methodConfig : getMethods()) {AbstractConfig.appendParameters(map, methodConfig, methodConfig.getName());String retryKey = methodConfig.getName() + ".retry";if (map.containsKey(retryKey)) {String retryValue = map.remove(retryKey);if ("false".equals(retryValue)) {map.put(methodConfig.getName() + ".retries", "0");}}}}return map;
}
5 总结
本次我们学习了Dubbo 3.x服务引用的入口源码。我们知道ReferenceConfig#init用于初始化服务引用,其中调用createProxy方法,**根据服务引用参数map创建服务接口代理引用对象,并赋值给res,**这是核心逻辑,我们下文再学习!