fdbus之消息接收流程

fdbus中的消息如何发送出去,前面的文章有的讲的很详细了,但是对于如何接收消息涉及的较少,本篇重点讲述fdbus是如何接收消息及消息在传递过程、传递方式(零拷贝?)。

还是从通过源码来讲述吧,更简单一些,消息的接收肯定是通过socket来实现的,但是我并没有找到循环读取socket的地方,只能从中间部分的函数讲起,但是这一部分已经能够将消息接收的处理讲明白了。话不多少,上代码:

void CFdbSession::onInput()
{if (fatalError()){goto _quit;}uint8_t prefix_buffer[CFdbMessage::mPrefixSize];if (receiveData(prefix_buffer, sizeof(prefix_buffer))){parsePrefix(prefix_buffer);}if (fatalError()){goto _quit;}if (receiveData(mPayloadBuffer + CFdbMessage::mPrefixSize,mMsgPrefix.mTotalLength - CFdbMessage::mPrefixSize)){processPayload();}_quit:if (mPayloadBuffer){delete[] mPayloadBuffer;mPayloadBuffer = 0;}
}

通过上面的代码可以看到通过调用receiveData方法获取消息结构中的prefix,然后通过调用parsePrefix解析消息中的prefix部分。然后调用receiveData获取消息结构中的消息内容,最后调用processPayload进行消息处理,processPayload方法的源码如下:

void CBaseSession::processPayload()
{const uint8_t *data = mPayloadBuffer + CFdbMessage::mPrefixSize;int32_t size = mMsgPrefix.mTotalLength - CFdbMessage::mPrefixSize;if (size < 0){fatalError(true);return;}CFdbMessageHeader head;CFdbParcelableParser parser(head);if (!parser.parse(data, mMsgPrefix.mHeadLength)){LOG_E("CBaseSession: Session %d: Unable to deserialize message head!\n", sid());fatalError(true);return;}auto type = head.type();switch (type){case FDB_MT_REQUEST:case FDB_MT_SIDEBAND_REQUEST:case FDB_MT_GET_EVENT:case FDB_MT_PUBLISH:case FDB_MT_SET_EVENT:doStatistics(type, head.flag(), mStatistics.mRx);doRequest(head);break;case FDB_MT_RETURN_EVENT:case FDB_MT_REPLY:case FDB_MT_SIDEBAND_REPLY:case FDB_MT_STATUS:doResponse(head);break;case FDB_MT_BROADCAST:doStatistics(type, head.flag(), mStatistics.mRx);doBroadcast(head);break;case FDB_MT_SUBSCRIBE_REQ:if (head.code() == FDB_CODE_SUBSCRIBE){doSubscribeReq(head, true);}else if (head.code() == FDB_CODE_UNSUBSCRIBE){doSubscribeReq(head, false);}break;default:LOG_E("CBaseSession: Message %d: Unknown type!\n", (int32_t)head.serial_number());fatalError(true);break;}
}

首先获取消息内容(从消息头部开始)首地址data和消息内容长度,调用CFdbParcelableParser类解析消息头部的内容,获取消息头部后就获取了消息的基本信息,其中就包括消息类型mType。根据消息类型的不同进入不同的分支进行处理,这里着重介绍FDB_MT_REPLY类型,因为这个是消息响应类型,会涉及到刚才提到的如何将受到的回复的消息内容替换到调用线程中函数的输入输出参数中。详细实现请看如下:

void CBaseSession::doResponse(CFdbMessageHeader &head)
{bool found;PendingMsgTable_t::EntryContainer_t::iterator it;CBaseJob::Ptr &msg_ref = mPendingMsgTable.retrieveEntry(head.serial_number(), it, found);if (found){auto msg = castToMessage<CFdbMessage *>(msg_ref);auto object_id = head.object_id();if (msg->objectId() != object_id){LOG_E("CFdbSession: object id of response %d does not match that in request: %d\n",object_id, msg->objectId());terminateMessage(msg_ref, FDB_ST_OBJECT_NOT_FOUND, "Object ID does not match.");mPendingMsgTable.deleteEntry(it);delete[] mPayloadBuffer;mPayloadBuffer = 0;return;}auto object = mContainer->owner()->getObject(msg, false);if (object){msg->update(head, mMsgPrefix);msg->decodeDebugInfo(head);msg->replaceBuffer(mPayloadBuffer, head.payload_size(), mMsgPrefix.mHeadLength);mPayloadBuffer = 0;auto type = msg->type();doStatistics(type, head.flag(), mStatistics.mRx);if (!msg->sync()){switch (type){case FDB_MT_REQUEST:object->doReply(msg_ref);break;case FDB_MT_SIDEBAND_REQUEST:object->onSidebandReply(msg_ref);break;case FDB_MT_GET_EVENT:object->doReturnEvent(msg_ref);break;case FDB_MT_SET_EVENT:default:if (head.type() == FDB_MT_STATUS){object->doStatus(msg_ref);}else{LOG_E("CFdbSession: request type %d doesn't match response type %d!\n",type, head.type());}break;}}}msg_ref->terminate(msg_ref);mPendingMsgTable.deleteEntry(it);}
}

上面的代码可以看到,首先获取消息头部的序列号mSn,每个发出的消息中的序列号是唯一的,在网络端点处理完成之后进行回复时也要将消息的序列号返回,以便原始请求消息发送方根据该序列号判断是否为自己发出消息匹配。

上面代码

CBaseJob::Ptr &msg_ref = mPendingMsgTable.retrieveEntry(head.serial_number(), it, found);

就是完成该逻辑。若找到匹配的消息则进行处理,通过该序列号找到原始消息请求方的消息对象指针,最关键的部分来了,每个session对象都包含一个指向收到消息的指针mPayloadBuffer,该指针指向的内存每收到一条消息都会重新申请,然后通过下面的语句

msg->replaceBuffer(mPayloadBuffer, head.payload_size(), mMsgPrefix.mHeadLength);

完成消息体内容的替换。

最后通过调用下面的语句,如果存在同步调用的情况,则通过唤醒条件变量来唤醒调用线程,解除阻塞调用线程。同时也完成了回复消息的内容替换到了原始消息对象指针指向的mBuffer等成员,更直白的说就是收到一条消息session对象首先申请一片内存,然后消息收到的消息读取到该内存中,然后解析消息头,解析消息头后,获取消息的序列号,通过序列号再本地找到发送的消息对象,该消息对象包含mBuffer mPayloadSize mHeadSize mOffset等成员,找到该对象后,首先释放掉mBuffer指向的内存,然后将session对象申请的内存赋值给mBuffer,这样就完成了消息的传递过程,这样在同步调用的时候CBaseJob::Ptr &msg_ref参数指向的对象就完成了从原始请求消息变成了回复消息,用户就可以从这个参数获取完整的回复消息。

msg_ref->terminate(msg_ref);

这样大家就明白了一个消息的完整的过程了吧。我觉的还是挺清晰的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/89799.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Mac电脑视频处理工具 Topaz Video AI for mac

Topaz Video AI是一款强大而易用的视频处理软件&#xff0c;通过人工智能技术提供高质量的视频增强和编辑功能。它可以帮助用户改善视频的质量、修复缺陷、优化图像&#xff0c;并提供丰富的编辑选项&#xff0c;以满足个性化的视频处理需求。无论是专业摄影师、视频编辑人员&a…

五、3d场景的卡片展示的创建

在我们3d的开发中&#xff0c;对某一些建筑和物体进行解释说明是非常常见的现象&#xff0c;那么就不得不说卡片的展示了&#xff0c;卡片展示很友好的说明了当前物体的状态&#xff0c;一目了然&#xff0c;下面就是效果图。 它主要有两个方法来实现&#xff0c;大量的图片建议…

maven找不到jar包

配置settings.xml文件之后出现报错找不到jar包 先改maven设置: 然后在重新清理构建项目: 可以通过执行以下命令清理本地 Maven 仓库 mvn dependency:purge-local-repository

Docker使用ssh连接ubuntu容器

容器ssh配置 启动容器 docker run -it -p 2222:22 ubuntu更新 apt update 配置密码 passwd安装openssh-server apt install openssh-server配置ssh echo "UsePAM no" >> /etc/ssh/sshd_config echo "PermitRootLogin yes" >> /etc/ssh/s…

渗透测试之打点

请遵守中华人民共和国网络安全法 打点的目的是获取一个服务器的控制权限 1. 企业架构收集 &#xff08;1&#xff09;官网 &#xff08;2&#xff09;网站或下属的子网站&#xff0c;依次往下 天眼查 企查查 2. ICP 备案查询 ICP/IP地址/域名信息备案管理系统 使用网站…

京东大型API网关实践之路

概述 1、背景 京东作为电商平台&#xff0c;近几年用户、业务持续增长&#xff0c;访问量持续上升&#xff0c;随着这些业务的发展&#xff0c;API网关应运而生。 API网关&#xff0c;就是为了解放客户端与服务端而存在的。对于客户端&#xff0c;使开放给客户端的接口标准统…

26342-2010 国际运尸 木质棺柩.

声明 本文是学习GB-T 26342-2010 国际运尸 木质棺柩. 而整理的学习笔记,分享出来希望更多人受益,如果存在侵权请及时联系我们 1 范围 本标准规定了国际运尸木质棺柩的分类、规格、技术要求、检验方法、包装、运输和储存。 本标准适用于为国际运尸木质棺柩的设计、生产、检测…

第七天:gec6818开发板QT和Ubuntu中QT安装连接sqlite3数据库驱动环境保姆教程

sqlite3数据库简介 帮助文档 SQL Programming 大多数关系型数的操作步骤&#xff1a;1&#xff09;连接数据库 多数关系型数据库都是C/S模型 (Client/Server)sqlite3是一个本地的单文件关系型数据库&#xff0c;同样也有“连接”的过程 2&#xff09;操作数据库 作为程序员&am…

如何将matlab中的mat矩阵文件在python中读取出来

先安装hdf5storage这个包 pip3 install hdf5storage 然后在当前目录下放入要读取的mat文件 # 将matlab中的mat文件读取出来 import hdf5storagedata hdf5storage.loadmat(inputWeights.mat) print(data[inputWeights])

分布式搜索引擎Elasticsearch

一、Elasticsearch介绍 1.Elasticsearch产生背景 大数据量的检索NoSql: not only sql,泛指非关系型的数据库Nginx的7层负载均衡和4层负载均衡2.Elasticsearch是什么 一个基于Lucene的分布式搜索和分析引擎,一个开源的高扩展的分布式全文检索引擎 Elasticsearch使用Java开发…

宿主机如何获取kvm虚拟机的ip地址

Can I determine the current IP from a known MAC Address? 参考 https://stackoverflow.com/questions/13552881/can-i-determine-the-current-ip-from-a-known-mac-address 最佳实践&#xff0c;通过nmap扫描来获取局域网中所有存活的ip&#xff0c;然后向每个ip发送一次p…

RNN模型与NLP应用(1/9):数据处理基础Data Processing Basics

文章目录 处理分类特征把分类特征转化为数值特征应用one-hot编码indice要从1开始而不能从0开始数据处理为什么使用one-hot向量 处理文本数据Step1&#xff1a;将文本分割成单词Step2&#xff1a;计算单词的频度按频度递减的方式排序 Step3&#xff1a;One-Hot编码 处理分类特征…

Stm32_标准库_GPIOA初始化

代码&#xff1a; #include "stm32f10x.h" // Device headerGPIO_InitTypeDef GPIO_InitStructur;//定义变量结构体int main(void){/*使用RCC开启GPIO的时钟*/RCC_APB2PeriphClockCmd(RCC_APB2Periph_GPIOA, ENABLE);//开启PA端口时钟/*使用GPIO_I…

黑马VUE3视频笔记

目录 一、使用create-vue创建项目 二、setup选项 三、reactive和ref函数 1.reactive() 2.ref() 三、computed 四、watch ​五、生命周期函数 六、父传子、子传父 父传子defineProps 子传父defineEmits 七、模板引用 ref defineExpose 八、跨层传递普通数据 prov…

JAXB(Java Architecture for XML Binding)下载、使用

简介 JAXB&#xff08;Java Architecture for XML Binding&#xff09;就是XML数据绑定的java架构。JAXB可以根据XML Schema生成java类&#xff0c;也能根据java类生成XML Schema&#xff0c;能将XML数据unmarshall到Java内容树&#xff0c;也能将Java内容树持久化为XML数据。…

【煤矿虚拟仿真体验】VR采煤机技能培训有效提高训练效果

在我们的社会中&#xff0c;能源是至关重要的。它是推动我们日常生活和工作的主要动力。然而&#xff0c;我们在获取这种能源的过程中&#xff0c;也带来了许多环境问题。煤矿开采是其中的一个重要部分&#xff0c;因此我们需要寻找更环保、更安全的方式来进行煤矿开采。VR&…

手把手教你用 Milvus 和 Towhee 搭建一个 AI 聊天机器人

作为向量数据库的佼佼者&#xff0c;Milvus 适用于各种需要借助高效和可扩展向量搜索功能的 AI 应用。 举个例子&#xff0c;如果想要搭建一个聊天机器人&#xff0c;Milvus 一定是其进行数据管理的首选。那么&#xff0c;如何让这个应用程序开发变得易于管理及更好理解&#x…

LeetCode算法二叉树—222. 完全二叉树的节点个数

目录 222. 完全二叉树的节点个数 - 力扣&#xff08;LeetCode&#xff09; 代码&#xff1a; 运行结果&#xff1a; 给你一棵 完全二叉树 的根节点 root &#xff0c;求出该树的节点个数。 完全二叉树 的定义如下&#xff1a;在完全二叉树中&#xff0c;除了最底层节点可能…

2023-09-28 monetdb-db,schema,user,role-分析

摘要: 对moentdb的database,schema,user,role和权限做分析, 以与mysql中的概念做对比分析. 备份: https://stoneatom.yuque.com/staff-ft8n1u/qfqtnb/gfqc62fozh0qsyqm 上下文相关: 2023-09-28 mysql-代号m-schema调研-文档记录-CSDN博客 2023-09-28 monetdb-databae的概念和…

前后缀分解

42. 接雨水 前后缀分解 class Solution {public int trap(int[] height) {int n = height.length;int[