基于Tars高并发IM系统的设计与实现-实战篇5

基于Tars高并发IM系统的设计与实现-实战篇5

群聊服务 GroupChatServer

群聊服务既可以接受来自BrokerServer的用户请求,也需要接收来自其他服务的RPC请求;所以本服务提供两套RPC接口:通用RPC接口和专用RPC接口。

通用RPC接口

通用RPC接口主要处理如下请求:

  • 创建群聊
  • 群聊加成员
  • 群聊减成员
  • 修改群资料
  • 发群消息
  • 换群主
  • 解散群聊
  • 同步用户群聊
  • 获取群成员
  • 解散群聊
  • 判断一个人是否为群成员

针对以上每个业务,根据用户请求的类型进行不同的业务逻辑处理,处理代码如下:

  switch(req.header.type){case otim::PT_MSG_GROUP_CHAT:this->sendMsg(clientContext, req, resp);break;case otim::PT_GROUPCHAT_SYNC:this->syncGroup(clientContext, req, resp);break;case otim::PT_GROUPCHAT_CREATE:this->createGroup(clientContext, req, resp);break;case otim::PT_GROUPCHAT_JION:this->joinGroup(clientContext, req, resp);break;case otim::PT_GROUPCHAT_QUIT:this->quitGroup(clientContext, req, resp);break;case otim::PT_GROUPCHAT_DISMISS:this->dismissGroup(clientContext, req, resp);break;case otim::PT_GROUPCHAT_UPDATE_CREATOR:this->updateGroupCreator(clientContext, req, resp);break;case otim::PT_GROUPCHAT_INFO_UPDATE:this->updateGroupInfo(clientContext, req, resp);break;case otim::PT_GROUPCHAT_MEMBERS_GET:this->getGroupMember(clientContext, req, resp);break;default:MLOG_DEBUG("the type is  invalid:"<<otim::etos((otim::PACK_TYPE)req.header.type));return otim::EC_PROTOCOL;}

群聊相关请求实现方法:

   int syncGroup(const otim::ClientContext & clientContext, const otim::OTIMPack & req,  otim::OTIMPack & resp);int createGroup(const otim::ClientContext & clientContext, const otim::OTIMPack & req,  otim::OTIMPack & resp);int dismissGroup(const otim::ClientContext & clientContext, const otim::OTIMPack & req,  otim::OTIMPack & resp);int updateGroupCreator(const otim::ClientContext & clientContext, const otim::OTIMPack & req,  otim::OTIMPack & resp);int updateGroupInfo(const otim::ClientContext & clientContext, const otim::OTIMPack & req,  otim::OTIMPack & resp);int joinGroup(const otim::ClientContext & clientContext, const otim::OTIMPack & req,  otim::OTIMPack & resp);int quitGroup(const otim::ClientContext & clientContext, const otim::OTIMPack & req,  otim::OTIMPack & resp);int getGroupMember(const otim::ClientContext & clientContext, const otim::OTIMPack & req,  otim::OTIMPack & resp);
专用RPC接口

主要提供两个接口:

interface GroupChatRPCServant
{int getGroupMember(string groupId, out vector<string> memberIds);bool isGroupMember(string groupId, string memberId);};

历史消息服务 HistoryMsgServer

该服务主要处理用户历史消息即相关的业务:

  • 热会话同步
  • 历史消息存取
  • 高优先级消息存取

该服务提供通用RPC服务,主要服务对象为接入服务BrokerServer;
用户所有消息都通过该服务进行存取;为高效存取,历史消息主要存储在redis,存储量及时长可以根据需求进一步来做配置开发。

业务逻辑处理接口

该服务采用通用接口来处理客户端请求;

   tars::Int32 processHotsessionReq(const otim::ClientContext & clientContext,const otim::OTIMPack & req,otim::OTIMPack &resp);tars::Int32 processPullHistoryReq(const otim::ClientContext & clientContext,const otim::OTIMPack & reqPack,otim::OTIMPack &respPack);tars::Int32 processHighPriorMsgSyncReq(const otim::ClientContext & clientContext,const otim::OTIMPack & reqPack,otim::OTIMPack &respPack);

冷存储服务器 OlapServer

该服务主要将IM数据存储到mysql中永久保存;专用RPC服务;

业务逻辑处理接口
interface OlapServant
{int saveMsg(otim::ClientContext clientContext, OTIMPack pack, string sessionId, long seqId);
};

消息操作服务 MsgOperatorServer

该服务主要有如下功能逻辑处理:

  • 消息控制请求(包含撤回,删除,覆写)
  • 消息已读处理
业务逻辑处理接口
    tars::Int32 processMsgUnreadReq(const otim::ClientContext & clientContext,const otim::OTIMPack & reqPack,otim::OTIMPack &respPack);tars::Int32 processMsgCTRLReq(const otim::ClientContext & clientContext,const otim::OTIMPack & reqPack,otim::OTIMPack &respPack);

消息撤回,删除,覆写逻辑处理

tars::Int32 MsgOperatorServantImp::processMsgCTRLReq(const otim::ClientContext & clientContext,const otim::OTIMPack & reqPack,otim::OTIMPack &respPack)
{SCOPELOGGER(scopelogger);scopelogger<<"reqPack:"<<reqPack.header.writeToJsonString();otim::MsgControl req;otim::unpackTars<otim::MsgControl>(reqPack.payload, req);MLOG_DEBUG("clientContext:"<<clientContext.writeToJsonString()<<" req:"<<req.writeToJsonString());respPack = reqPack;respPack.header.flags |= otim::PF_ISACK;otim::CommonErrorCode respData;respData.code = otim::EC_SUCCESS;if (req.sessionId.empty() || req.seqId == 0 || req.packId.empty()){respData.code = otim::EC_PARAM;otim::packTars<otim::CommonErrorCode>(respData, respPack.payload);MLOG_DEBUG("sessionId,packId or seqId is is empty req:"<<req.writeToJsonString());return  respData.code;}otim::RedisConnPtr redis(otim::RedisPool::instance());//get old msgstd::vector<std::string> msgs;std::vector<std::string> scores;EMRStatus ret = redis->ZRangeByScoreAndLimit(otim::RKEY_MSG + req.sessionId, req.seqId, 5, msgs);if (EMRStatus::EM_KVDB_ERROR == ret){MLOG_ERROR("get msg fail!, sessionId:" << req.sessionId<<" msgId:"<<req.seqId);respData.code = otim::EC_DB_ERROR;otim::packTars<otim::CommonErrorCode>(respData, respPack.payload);return otim::EC_DB_ERROR;}MLOG_DEBUG("get old msg size:"<<msgs.size());otim::OTIMPack packOrg;for (auto item : msgs){std::vector<char> vctItem(item.begin(), item.end());otim::OTIMPack packItem;otim::unpackTars<otim::OTIMPack>(vctItem, packItem);MLOG_DEBUG("msgs :"<<packItem.header.writeToJsonString());if (packItem.header.packId == req.packId){packOrg = packItem;}}if (packOrg.header.packId.empty()){MLOG_WARN("The org msg is not exist:"<<req.sessionId<<" packId:"<<req.packId <<" seqId:"<<req.seqId);respData.code = otim::EC_MSG_NOT_EXIST;otim::packTars<otim::CommonErrorCode>(respData, respPack.payload);return respData.code;}MLOG_DEBUG("org msg:"<<packOrg.header.writeToJsonString());std::string to;if (req.command == otim::MC_REVOKE){packOrg.header.flags |= otim::PF_REVOKE;MLOG_DEBUG("revoke msg:"<<req.packId);}else if (req.command == otim::MC_OVERRIDE){packOrg.header.flags |= otim::PF_OVERRIDE;otim::MsgReq msgReq;otim::unpackTars<otim::MsgReq>(packOrg.payload, msgReq);msgReq.content = req.content;otim::packTars<otim::MsgReq>(msgReq, packOrg.payload);to = msgReq.to;MLOG_DEBUG("override msg:"<<req.packId);}else if (req.command == otim::MC_DELETE){
//        packOrg.header.flags |= otim::PF_REVOKE;MLOG_DEBUG("delete msg:"<<req.packId);}else{MLOG_WARN("The command is error:"<<req.command<<" packId:"<<req.packId);respData.code = otim::EC_MSG_OP_CMD;otim::packTars<otim::CommonErrorCode>(respData, respPack.payload);return otim::EC_MSG_OP_CMD;}ret = redis->ZSetRemoveByScore(otim::RKEY_MSG + req.sessionId, req.seqId, req.seqId);if (EMRStatus::EM_KVDB_SUCCESS != ret ){MLOG_ERROR("delete original msg fail:"<<(int)ret);}//增加新的消息if (req.command != otim::MC_DELETE){std::string msgSave;otim::packTars<otim::OTIMPack>(packOrg, msgSave);ret = redis->ZSetAdd(otim::RKEY_MSG + req.sessionId, req.seqId, msgSave);if ( EMRStatus::EM_KVDB_SUCCESS != ret ){MLOG_ERROR("save cancel msg fail!");}}//通知在线接收者其他端otim::sendPackToMySelf(clientContext, reqPack);//  send to userstd::vector<std::string> vctUserId;if (packOrg.header.type == otim::PT_MSG_SINGLE_CHAT || packOrg.header.type == otim::PT_MSG_BIZ_NOTIFY){if (to.empty()){otim::MsgReq msgReq;otim::unpackTars<otim::MsgReq>(packOrg.payload, msgReq);to = msgReq.to;}vctUserId.push_back(to);MLOG_DEBUG("single or notify chat  packId:"<<packOrg.header.packId<<" to:"<<to);}else if (packOrg.header.type == otim::PT_MSG_GROUP_CHAT){//get groupMemberotim::GroupChatRPCServantPrx groupChatRPCServantPrx = otim::getServantPrx<otim::GroupChatRPCServantPrx>(PRXSTR_GROUP_CHAT_RPC);groupChatRPCServantPrx->getGroupMember(req.sessionId, vctUserId);MLOG_DEBUG("group chat  packId:"<<packOrg.header.packId<<" to:"<<req.sessionId<<" member Size:"<<vctUserId.size());}int64_t seqId = otim::genSeqId();for (auto userId : vctUserId){otim::savePriorityMsg(redis.get(), reqPack, userId, seqId);otim::dispatchMsg(clientContext, reqPack, userId);}respData.code = otim::EC_SUCCESS;otim::packTars<otim::CommonErrorCode>(respData, respPack.payload);return otim::EC_SUCCESS;
}

Http接口服务

该服务针对第三方提供消息能力,主要提供如下接口:

  • 发送消息(简单文本消息,复杂消息)
  • 添加好友
  • 删除好友
  • 查看好友

功能实现函数

    std::string  doSendSimpleMsgCmd(TC_HttpRequest &cRequest);std::string  doSendMsgCmd(TC_HttpRequest & cRequest);std::string  doAddFriend(TC_HttpRequest &request);std::string  doDelFriend(TC_HttpRequest &request);std::string  doGetFriends(TC_HttpRequest &request);

Push推送服务

该服务主要实现IM消息的离线推送能力, APP客户端不在线的场景下,将消息通过离线通道push用户的手机上,以提高消息的触达率。

主要实现iOS APNS,Android FCM,华为,小米,oppo,vivo等厂商的离线消息推送功能;
需要根据各个厂商开放平台提供的API进行开发集成。

Android 厂商开发平台地址:

  • 华为:
    https://developer.huawei.com/consumer/cn/hms/huawei-pushkit
  • 小米:
    https://dev.mi.com/console/appservice/push.html
  • 魅族:
    http://open-wiki.flyme.cn/doc-wiki/index
  • vivo:
    https://push.vivo.com.cn/#/
  • oppo:
    https://push.oppo.com/

RPC接口

enum PushServiceType
{PS_TYPE_NONE = 0, //无 Push服务提供商PS_TYPE_IOS = 1,  //IOS Push服务提供商PS_YPE_HUAWEI = 2,   //华为 Push服务提供商PS_TYPE_XIAOMI = 3,   //小米 Push服务提供商PS_TYPE_MEIZU = 4,   //魅族 Push服务提供商PS_TYPE_VIVO = 5,  //vivi服务PS_TYPE_OPPO = 6, //oppo服务PS_TYPE_FCM = 7, //FCM服务
};struct RegInfo {0  require string packId = "";                           //消息的id1  require PushServiceType serviceType = 0;              //push服务提供商2  require string packageName = "";                          //包名3  require string userId = "";                            //用户id4  optional string appVersion = "";                       //app version
};struct PushInfo {0  require string packId = "";                           //消息的id1  require string userId = "";                            //用户id2  require int unReadCount = 0;                          //未读消息数3  require string title = "";                            /push标题4  require string content = "";                          //push内容5  optional string uri = "";                              //跳转uri6  optional string extraData="";                           //业务自定义字段
};interface PushServant
{int register(RegInfo regInfo);int pushMessage(PushInfo pushInfo);
};

服务端部署

编译打包

所有服务开发完成后,执行如下命令进行编译,打包:

make release
make clean all
make tar

程序包部署

根据前期部署好的Tars框架环境、web管理系统,将程序包逐个发布,发布后的系统如图:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/30118.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

编写第一个 React Native 程序

React Native 目录 使用React Native CLI命令创建的目录如下图所示&#xff1a; 重要目录说明 目录说明__tests__存放测试用例的目录.bundle / config配置文件&#xff08;一般不会用到&#xff09;android 和 IOS 文件夹这两个文件夹主要是存放安卓和 ios 相关的配置文件和…

380. O(1) 时间插入、删除和获取随机元素 -------------Map类型在O(1)复杂度内实现插入删除

380. O(1 时间插入、删除和获取随机元素 原题链接&#xff1a;完成情况&#xff1a;解题思路&#xff1a;参考代码&#xff1a; 原题链接&#xff1a; 380. O(1) 时间插入、删除和获取随机元素 https://leetcode.cn/problems/insert-delete-getrandom-o1/description/ 完成…

【Spring】如果你需要使用重试机制,请使用Spring官方的Spring Retry

文章目录 前言Spring Retry的基本使用第一步&#xff0c;引入Spring Retry的jar包第二步&#xff0c;构建一个RetryTemplate类第三步&#xff0c;使用RETRY_TEMPLATE注意事项 拓展方法降级操作重试策略&#xff1a;时间策略重试策略&#xff1a;指定异常策略 前言 Spring Retr…

SpringBootWeb案例-准备工作

目录 前言 准备工作 需求&环境搭建 需求 环境搭建 开发规范 Restful开发规范 统一的响应结果 开发流程 前言 根据过往的文章可以知道目前我已经学习完了前端、后端、数据库的基础知识&#xff0c;接下来通过一个基于SpringBoot工程开发的web项目案例。 准备工作 …

视频监控汇聚EasyCVR平台WebRTC流地址无法播放的原因排查

开源EasyDarwin视频监控TSINGSEE青犀视频平台EasyCVR能在复杂的网络环境中&#xff0c;将分散的各类视频资源进行统一汇聚、整合、集中管理&#xff0c;在视频监控播放上&#xff0c;TSINGSEE青犀视频安防监控汇聚平台可支持1、4、9、16个画面窗口播放&#xff0c;可同时播放多…

【Java并发】什么是AQS?

文章目录 什么是AQS?AQS与Synchronized的区别AQS-基本工作机制AQS是公平锁与非公平锁 什么是AQS? 全称是 AbstractQueuedSynchronizer&#xff0c;即抽象队列同步器。它是构建锁或者其他同步组件的基础框架 所谓抽象&#xff0c;其实目的就是把具体的逻辑交给子类去实现&…

【计算机视觉】关于图像处理的一些基本操作

目录 图像平滑滤波处理均值滤波计算过程python实现 高斯滤波计算过程python实现 中值滤波计算过程python实现 图像的边缘检测Robert算子计算过程python实现 图像处理腐蚀算子计算过程python实现 Hog&#xff08;梯度方向直方图&#xff09;特征计算流程&#xff1a;Hog的特征维…

什么是自定义表单和工作流?看完这篇文章就懂了

在很多中大型企业中&#xff0c;低代码技术平台的应用价值是较高的&#xff0c;也深得广大用户朋友的青睐和喜爱。其中的自定义表单和工作流是该平台的主要功能&#xff0c;可以解决当前工作效率低下、解放程序员时间和精力等各种现实问题&#xff0c;可以说是实现办公流程化、…

Pytorch量化之Post Train Static Quantization(训练后静态量化)

使用Pytorch训练出的模型权重为fp32&#xff0c;部署时&#xff0c;为了加快速度&#xff0c;一般会将模型量化至int8。与fp32相比&#xff0c;int8模型的大小为原来的1/4, 速度为2~4倍。 Pytorch支持三种量化方式&#xff1a; 动态量化&#xff08;Dynamic Quantization&…

nvm安装以及使用

注意事项&#xff1a; 安装前需要卸载原有的node&#xff0c;卸载干净后cmd输入node -v查看&#xff1b; 一&#xff0c;下载nvm 下载&#xff1a;https://github.com/coreybutler/nvm-windows/releases 选择第四个 “nvm-setup.zip”&#xff1b; 二&#xff0c;安装 1&…

IAR目标代码4字节对齐

向工程添加文件 eof.c : // 文件头 #if defined(__CC_ARM) // MDK // uint32_t g_update_flag[2] __attribute__((zero_init, at(0x1000FFF0)));const unsigned long gc_eof __attribute__((used)) 0xFFFFFFFFul; #elif defined(__ICCARM__) // IAR__root const unsigned…

分布式 - 消息队列Kafka:Kafka生产者发送消息的分区策略

文章目录 1. PartitionInfo 分区源码2. Partitioner 分区器接口源码3. 自定义分区策略4. 轮询策略 RoundRobinPartitioner5. 黏性分区策略 UniformStickyPartitioner6. hash分区策略7. 默认分区策略 DefaultPartitioner 分区的作用就是提供负载均衡的能力&#xff0c;或者说对数…

ArcGIS Pro实践技术应用暨基础入门、制图、空间分析、影像分析、三维建模、空间统计分析与建模、python融合、案例应用

GIS是利用电子计算机及其外部设备&#xff0c;采集、存储、分析和描述整个或部分地球表面与空间信息系统。简单地讲&#xff0c;它是在一定的地域内&#xff0c;将地理空间信息和 一些与该地域地理信息相关的属性信息结合起来&#xff0c;达到对地理和属性信息的综合管理。GIS的…

数字化时代,如何做好用户体验与应用性能管理

引言 随着数字化时代的到来&#xff0c;各个行业的应用系统从传统私有化部署逐渐转向公有云、行业云、微服务&#xff0c;这种变迁给运维部门和应用部门均带来了较大的挑战。基于当前企业 IT 运维均为多部门负责&#xff0c;且使用多种运维工具&#xff0c;因此&#xff0c;当…

hacksudo3 通关详解

环境配置 一开始桥接错网卡了 搞了半天 改回来就行了 信息收集 漏洞发现 扫个目录 大概看了一眼没什么有用的信息 然后对着login.php跑了一下弱口令 sqlmap 都没跑出来 那么利用点应该不在这 考虑到之前有过dirsearch字典太小扫不到东西的经历 换个gobuster扫一下 先看看g…

Android界面设计与用户体验

Android界面设计与用户体验 1. 引言 在如今竞争激烈的移动应用市场&#xff0c;提供优秀的用户体验成为了应用开发的关键要素。无论应用功能多么强大&#xff0c;如果用户界面设计不合理&#xff0c;用户体验不佳&#xff0c;很可能会导致用户流失。因此&#xff0c;在Androi…

Flink源码之JobManager启动流程

从启动命令flink-daemon.sh中可以看出StandaloneSession入口类为org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint, 从该类的main方法会进入ClusterEntrypoint::runCluster中, 该方法中会创建出主要服务和组件。 StandaloneSessionClusterEntrypoint:…

博客项目(Spring Boot)

1.需求分析 注册功能&#xff08;添加用户操纵&#xff09;登录功能&#xff08;查询操作)我的文章列表页&#xff08;查询我的文章|文章修改|文章详情|文章删除&#xff09;博客编辑页&#xff08;添加文章操作&#xff09;所有人博客列表&#xff08;带分页功能&#xff09;…

FPGA外部触发信号毛刺产生及滤波

1、背景 最近在某个项目中&#xff0c;遇到输入给FPGA管脚的外部触发信号因为有毛刺产生&#xff0c;导致FPGA接收到的外部触发信号数量多于实际值。比如&#xff1a;用某个信号源产生1000个外部触发信号&#xff08;上升沿触发方式&#xff09;给到FPGA输入IO&#xff0c;实际…

冠达管理:股票注册制通俗理解?

目前我国A股商场正在进行股票注册制变革&#xff0c;相较之前的发行准则&#xff0c;股票注册制在理念上更为商场化&#xff0c;这意味着公司发行股票的门槛将下降&#xff0c;公司数量将添加&#xff0c;而股票流通的方式也将有所改变。那么股票注册制指的是什么&#xff0c;它…