openstack内部rpc消息通信源码分析

我们知道openstack内部消息队列基于AMQP协议,默认使用的rabbitmq 消息队列。谈到rabbitmq,大家或许并不陌生,但或许会对oslo message有些陌生。openstack内部并不是直接使用rabbitmq,而是使用了oslo.message 。oslo.message 后端的driver支持rabbitmq,kafka,zeromq等消息队列(目前只有rabbitmq能用于openstack) 。在 oslo message中封装了OpenStack各组件内部进行消息通信的方法,并将方法中所使用的数据结构封装为通用的类,以达到使用简单快捷、扩展性强的目的。

下面是rabbitmq 支持的模式和场景,简单回顾一下,包括简单模式,工作队列模式,订阅发布模式,路由模式,topics模式,RPC模式。

d7dbbd8eb405330cc3c3c58b2f102645.png23d05c3eaff976a778dc8377992f7866.png

官网有详细的使用说明, 具体rabbitmq 的使用可以参考 https://www.rabbitmq.com/tutorials/tutorial-one-python

前面提到了openstack内部消息通信其实是使用的oslo.message库,关于oslo message 主要提供俩种主要功能:

远程过程调用 RPC:一个服务进程可以调用其他远程服务进程的方法。调用的方式:

rpc.call():远程方法会被同步执行,调用者会被阻塞直到返回方法的结果,在一些调用时间较长的场合中使用会对效率有很大的影响。

rpc.cast():远程服务的方法会被异步执行,调用者不会被阻塞,结果也无须立即返回,因为是异步,所以也要求调用者利用其他的方法来查询这次远程调用的结果。

事件通知:某一个服务进程将事件通知发送到消息总线上,所有在消息总线上且对该事件通知感兴趣的服务进程都可以将该事件通知获取并进行处理,执行的结果并不需要返回给事件发送者。这种方式不仅可以在项目组件内部的进程服务通信间实现,还可以在项目之间的通信中实现比如计量计费等。

Oslo.message中的几个重要概念

  • server:rpc 服务端,包含一个或多个端点(Endpoint),每个端点包含一组远程调用的方法,这组方法可以被客户端通过transport对象远程调用。创建Server对象时,需要指定Transport、Target和一组endpoint。

  • client:rpc 客户端, 负责调用服务端提供的RPC接口。

  • exchange:rabbitmq中的概念,一种交换实现,负责把消息交换到相对应队列上。

  • namespace:服务器端可以在一个主体上暴露多组方法,每组方法属于一个命名空间。

  • method:方法,方法由一个名字和相关参数组成。

  • transport:顾名思义:运输工具,就是运输载体,一个传送RPC 请求到服务器端并将响应返回给客户端的底层消息系统。目前主要使用的transport有rabbitmq和qpid。

URL格式:Transport://user:password@hotst1:port[,host:port]/virtual_host

  • API version:每个命名空间都有一个版本号,当命名空间的接口变化时,这个版本号也会响应增加。向前兼容的修改只需要更改小版本号,向前不兼容的更改需要更改大版本号。

  • Target:目的地,指定某一个消息最终目的地的所有信息。Target中封装了所有将要用到的信息,以确定应该将消息发送到何处或服务器正在侦听什么信息。

下面讲解一下组件cinder 组件内部的 rpc 通信, 从在rpc client和 rpc server端从代码看具体实现,

RPC Client

当cinder-api 收到创建volume 云硬盘时,cinder-scheduler 调度完资源filter出合适的backend之后,在cinder.scheduler.rpcapi 代码中,rpc client 发出创建volume 的rpc 请求 , 代码是下面这样的:

def create_volume(self, ctxt, volume, snapshot_id=None, image_id=None,                  request_spec=None, filter_properties=None,                  backup_id=None):    volume.create_worker()    cctxt = self._get_cctxt()    msg_args = {'snapshot_id': snapshot_id, 'image_id': image_id,                'request_spec': request_spec,                'filter_properties': filter_properties,                'volume': volume, 'backup_id': backup_id}    if not self.client.can_send_version('3.10'):        msg_args.pop('backup_id')    return cctxt.cast(ctxt, 'create_volume', **msg_args)

由前面所提到的,cast和call分别对应异步和同步请求。当调用cast或者call时,通过oslo.message库序列化消息体,通过 调用transport._send 发送到哪个target,transport 会调用对应driver 比如 AMQPDriverBase.send方法。从连接池中获取到 rabbitmq connection 连接,根据消息类型,选择通过topic exchange还是fanout exchange等模式 , 调用 kombu(类似于pika,但是支持重连策略以及连接池功能等)发送到对应的消息队列中。

try:with self._get_connection(rpc_common.PURPOSE_SEND, retry) as conn:if notify:exchange = self._get_exchange(target)LOG.debug(log_msg + "NOTIFY exchange '%(exchange)s'"" topic '%(topic)s'", {'exchange': exchange,'topic': target.topic})conn.notify_send(exchange, target.topic, msg, retry=retry)elif target.fanout:log_msg += "FANOUT topic '%(topic)s'" % {'topic': target.topic}LOG.debug(log_msg)conn.fanout_send(target.topic, msg, retry=retry)else:topic = target.topicexchange = self._get_exchange(target)if target.server:topic = '%s.%s' % (target.topic, target.server)LOG.debug(log_msg + "exchange '%(exchange)s'"" topic '%(topic)s'", {'exchange': exchange,'topic': topic})conn.topic_send(exchange_name=exchange, topic=topic,msg=msg, timeout=timeout, retry=retry,transport_options=transport_options)

那么send 完发送到队列中后,服务端怎么就能执行到对应的方法呢,我们看下rpc server端的实现

RPC Server

以cinder-volume 为例,在cinder-volume 服务启动时,会先初始化rpc 再启动rpc server,其实每个服务都是这样。

通过在cinder.service.Service.start 函数中,调用 messaging.get_rpc_server 构造rpc_server对象,调用rpc_server对象start方法启动。

def18ff49fae31b290786323ccda38b3.jpeg

每个组件通过service start时 ,会启动相关的rpc 服务。

if not rpc.initialized():rpc.init(CONF)endpoints = [self.manager]
endpoints.extend(self.manager.additional_endpoints)serializer = objects_base.CinderObjectSerializer(obj_version_cap)target = messaging.Target(topic=self.topic, server=self.host)
self.rpcserver = rpc.get_server(target, endpoints, serializer)
self.rpcserver.start()if self.topic == constants.VOLUME_TOPIC:target = messaging.Target(topic='%(topic)s.%(host)s' % {'topic': self.topic,'host': self.host},server=vol_utils.extract_host(self.host, 'host'))self.backend_rpcserver = rpc.get_server(target, endpoints,serializer)self.backend_rpcserver.start()

由上面可以看到在构造 messaging.get_rpc_server 实例时 ,传入TRANSPORT,target,endpoints, json serializer,其中TRANSPORT 传的rabbitmq,target 传入的是包含当前服务 topic和本机名称的target对象,endpoints这里传的就是VolumeManager对象,serializer传入的是json serializer,最终通过构造了RPCServer实例,self.rpcserver.start() 启动会 启动 max_workers 大小的eventlet 协程池,创建 listener , 并不断处理incoming的message。RPCServer. processincoming 中取到message后,确认消息,并dispatch消息到对应的endpint上

class RPCServer(msg_server.MessageHandlingServer):def _process_incoming(self, incoming):try:res = self.dispatcher.dispatch(message)...........
def _do_dispatch(self, endpoint, method, ctxt, args):ctxt = self.serializer.deserialize_context(ctxt)new_args = dict()for argname, arg in args.items():new_args[argname] = self.serializer.deserialize_entity(ctxt, arg)func = getattr(endpoint, method)result = func(ctxt, **new_args)return self.serializer.serialize_entity(ctxt, result)

oslo.message 在接收到dispatch的message后,解析message中的method,args,namespace,version等,并遍历endpoints,如果endpoint含有对应method,则反射执行,并最终反序列化返回结果

总体来说:相比较其他消息队列,比如kafka,redis,pulsar,rocketMQ ,rabbitmq算是功能比较丰富的消息队列了,openstack社区实现的 oslo message 完美的基于rabbitmq 很好的实现了一套内部组件的消息通信功能。可能这种消息通信方式用的比较少,但是代码实现上来说很有深度,尤其在较大型项目中,很有借鉴价值。


推荐阅读:

高性能版云联网实现原理

虚拟机磁盘热切换实现方案

VPC场景虚机热迁网络无感

更多技术和产品文章,请关注👆

如果您对哪个产品感兴趣,欢迎留言给我们,我们会定向邀文~

360智汇云是以"汇聚数据价值,助力智能未来"为目标的企业应用开放服务平台,融合360丰富的产品、技术力量,为客户提供平台服务。
目前,智汇云提供数据库、中间件、存储、大数据、人工智能、计算、网络、视联物联与通信等多种产品服务以及一站式解决方案,助力客户降本增效,累计服务业务1000+。
智汇云致力于为各行各业的业务及应用提供强有力的产品、技术服务,帮助企业和业务实现更大的商业价值。
官网:https://zyun.360.cn 或搜索“360智汇云”
客服电话:4000052360

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/62164.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Postman自定义脚本Pre-request-script以及Test

这两个都是我们进行自定义script脚本的地方,分别是在请求执行的前后运行。 我们举两个可能经常运用到的场景。 (一)请求A先执行,请求B使用请求A响应结果作为参数。如果我们不用自定义脚本,可能得先执行请求A,然后手动复制响应结果…

总结的一些MySql面试题

目录 一:基础篇 二:索引原理和SQL优化 三:事务原理 四:缓存策略 一:基础篇 1:定义:按照数据结构来组织、存储和管理数据的仓库;是一个长期存储在计算机内的、有组织的、可共享 的…

116. UE5 GAS RPG 实现击杀掉落战利品功能

这一篇,我们实现敌人被击败后,掉落战利品的功能。首先,我们将创建一个新的结构体,用于定义掉落体的内容,方便我们设置掉落物。然后,我们实现敌人死亡时的掉落函数,并在蓝图里实现对应的逻辑&…

Excel技巧:如何批量调整excel表格中的图片?

插入到excel表格中的图片大小不一,如何做到每张图片都完美的与单元格大小相同?并且能够根据单元格来改变大小?今天分享,excel表格里的图片如何批量调整大小。 方法如下: 点击表格中的一个图片,然后按住Ct…

智能合约

06-智能合约 0 啥是智能合约? 定义 智能合约,又称加密合约,在一定条件下可直接控制数字货币或资产在各方之间转移的一种计算机程序。 角色 区块链网络可视为一个分布式存储服务,因为它存储了所有交易和智能合约的状态 智能合约还…

智慧油客:从初识、再识OceanBase,到全栈上线

今天,我们邀请了智慧油客的研发总监黄普友,为我们讲述智慧油客与 OceanBase 初识、熟悉和结缘的故事。 智慧油客自2016年诞生以来,秉持新零售的思维,成功从过去二十年间以“以销售产品为中心”的传统思维模式,转向“以…

【深度学习】手机SIM卡托缺陷检测【附链接】

一、手机SIM卡托用途 SIM卡托是用于固定和保护SIM卡的部件,通过连接SIM卡与手机主板的方式,允许设备访问移动网络,用户可以通过SIM卡进行通话、发送短信和使用数据服务。 二、手机SIM卡托不良影响 SIM卡接触不良,造成信号中断&…

消防物证管理系统|DW-S404实现消防物证智能化管理

一、系统概述 智慧消防物证管理系统DW-S404系统旨在借助现代信息技术,达成消防物证管理的高效化、安全化及智能化管理目标。该系统运用物联网、大数据、云计算等先进技术,实现对消防物证从产生到销毁的全生命周期跟踪与监控,从而增强物证管理…

Odoo :一款免费且开源的食品生鲜领域ERP管理系统

文 / 贝思纳斯 Odoo金牌合作伙伴 引言 提供业财人资税的精益化管理,实现研产供销的融通、食品安全的追踪与溯源,达成渠道的扁平化以及直面消费者的 D2C 等数字化解决方案,以此提升运营效率与核心竞争力,支撑高质量的变速扩张。…

如何部署vue项目到Github Pages

1.创建vue项目 npm create vitelatest my-vue-app -- --template vue 2.创建github仓库 3.连接仓库 在项目根目录右键选择open git base here,如果没有安装git请先安装git。 初始化仓库 $ git init $ git add . $ git commit -m "init"将项目与仓库连…

Dubbo应用篇

文章目录 一、Dubbo简介二、SSM项目整合Dubbo1.生产者方配置2.消费者方配置 三、Spring Boot 项目整合Dubbo1.生产者方配置2.消费者方配置 四、应用案例五、Dubbo配置的优先级别1. 方法级配置(Highest Priority)2. 接口级配置3. 消费者/提供者级配置4. 全…

ubuntu的matlab使用心得

1.读取视频 v VideoReader(2222.mp4);出问题,报错: matlab 错误使用 VideoReader/initReader (第 734 行) 由于出现意外错误而无法读取文件。原因: Unable to initialize the video properties 出错 audiovideo.internal.IVideoReader (第 136 行) init…

消息中间件-Kafka1-实现原理

消息中间件-Kafka 一、kafka简介 1、概念 Kafka是最初由Linkedin公司开发,是一个分布式、支持分区(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以…

如何利用“一键生成ppt”减轻工作压力

随着数字化的快速发展,PPT设计这一传统任务也迎来了新的变化。过去,制作一个简洁、专业的PPT需要花费大量时间与精力。但现在借助科技的力量,一键生成PPT的梦想成真了。从智能生成ppt到ai生成ppt的技术不断进步,令我们能够体验到更…

创造未来:The Sandbox 创作者训练营如何赋能全球创造者

创作者训练营让创造者有能力打造下一代数字体验。通过促进合作和提供尖端工具,The Sandbox 计划确保今天的元宇宙是由一个个创造者共同打造。 2024 年 5 月,The Sandbox 推出了「创作者训练营」系列,旨在重新定义数字创作。「创作者训练营」系…

Docker多架构镜像构建踩坑记

背景 公司为了做信创项目的亮点,需要将现有的一套在X86上运行的应用系统迁移到ARM服务器上运行,整个项目通过后端Java,前端VUEJS开发通过CICD做成Docker镜像在K8S里面运行。但是当前的CICD产品不支持ARM的镜像构建,于是只能手工构…

python学opencv|读取图像(三)放大和缩小图像

【1】引言 前序已经学习了常规的图像读取操作和图像保存技巧,相关文章链接为: python学opencv|读取图像-CSDN博客 python学opencv|读取图像(二)保存彩色图像-CSDN博客 今天我们更近一步,学习放大和缩小图像的技巧&…

D86【python 接口自动化学习】- pytest基础用法

day86 pytest配置testpaths 学习日期:20241202 学习目标:pytest基础用法 -- pytest配置testpaths 学习笔记: pytest配置项 主目录创建pytest.ini文件 [pytest] testpaths./testRule 然后Terminal里直接命令:pytest&#xff…

基于 Apache Dolphinscheduler3.1.9中的Task 处理流程解析

实现一个调度任务,可能很简单。但是如何让工作流下的任务跑得更好、更快、更稳定、更具有扩展性,同时可视化,是值得我们去思考得问题。 Apache DolphinScheduler是一个分布式和可扩展的开源工作流协调平台,具有强大的DAG可视化界…

Flask使用长连接

Flask使用flask_socketio实现websocket Python中的单例模式 在HTTP通信中,连接复用(Connection Reuse)是一个重要的概念,它允许客户端和服务器在同一个TCP连接上发送和接收多个HTTP请求/响应,而不是为每个新的请求/响…