CPP集群聊天服务器开发实践(六):Redis发布订阅消息队列及服务器集群通信

        前文实现了单服务器与多客户端之间的通信以及聊天业务,同时为了增大并发量利用nginx实现多服务器的集群负载均衡,但是一个关键的问题是要实现多服务器之间的通信,这里采用Redis的发布订阅消息队列实现。

        不同客户端可能连接在不同服务器上,服务器可以向Redis发起订阅和发布请求,当有消息需要发送时以发布的形式写入Redis消息队列,当有接收的消息时,在订阅的前提下Redis负责将消息通知给服务器。

同步redis发布-订阅封装的代码主要提供以下方法:
1.connect:连接redis服务器生成了两个redisContext对象,一个用于发布消息(_publish_context),一个用于订阅通道(_subcribe_context)。
2.publish:在_publish_context上发布消息,id+msg,阻塞接收redis server的响应。
3.subscribe:在_subcribe_context上订阅通道,id,不接收redis server的响应。
4.unsubscribe:在_subcribe_context上取消订阅通道,id,不接收redis server的响应。
5.由于订阅/取消订阅接收响应都是阻塞型的,所以单独开辟线程thread:
    通过observer_channel_message函数调用redisGetReply循环阻塞方式接收订阅通道中的消息,回调通知上层应用(id+msg)。

实现源代码如下:

#include"redis.hpp"
#include<iostream>
#include<thread>
using namespace std;//构造函数,将订阅和发布的两个对象指针置空
Redis::Redis():_publish_context(nullptr),_subcribe_context(nullptr)
{}
//析构函数,释放资源
Redis::~Redis(){if(_publish_context!=nullptr){redisFree(_publish_context);}if(_subcribe_context!=nullptr){redisFree(_subcribe_context);}
}
//连接redis服务器
bool Redis::connect(){//两个对象连接redis服务器,redis默认ip+port=127.0.0.1:6379_publish_context = redisConnect("127.0.0.1", 6379);if(nullptr == _publish_context){cerr<<"connect redis failed!"<<endl;return false;}_subcribe_context = redisConnect("127.0.0.1", 6379);if(nullptr == _subcribe_context){cerr<<"connect redis failed!"<<endl;return false;}//在单独的线程中,监听通道上的事件,有消息给业务层进行上报//因为上报和订阅都是阻塞的,需要单独开辟一个线程进行消息上报,否则服务器无法进行其他业务thread t([&](){observer_channel_message();});t.detach();cout<<"connect redis-server success!"<<endl;return true;
}//向redis指定的通道channel发布消息
bool Redis::publish(int channel, string message){//相当于命令行 publish channel message//redisCommand是同步操作,阻塞,相当于redisAppendCommand+redisBufferWrite+redisGetReply,pubilsh是一致性马上回复,所以可以阻塞等待//redisAppendCommand是将命令组装好后放到本地缓存//redisBufferWrite是将本地缓存的命令发送到redis服务器//redisGetReply是从redis服务器获取返回的结果(阻塞型)redisReply* reply = (redisReply*)redisCommand(this->_publish_context, "PUBLISH %d %s", channel, message.c_str());if(nullptr == reply){cerr<<"publish message failed!"<<endl;return false;}freeReplyObject(reply);return true;
}
//向redis指定的通道subscribe订阅消息
bool Redis::subscribe(int channel){//subscribe命令本身会造成线程阻塞等待通道里发生消息,这里之作订阅通道,不接受通道消息//通道消息的接收专门在observer_channel_message函数中的独立线程中进行//只负责发送命令,不阻塞接收redis server响应消息,否则和notifyMsg线程抢占响应资源if(REDIS_ERR == redisAppendCommand(this->_subcribe_context, "SUBSCRIBE %d", channel)){cerr<<"subscribe channel failed!"<<endl;return false;}//redisBufferWrite可以循环发送缓冲区,直到缓冲区数据发送完毕(done被置为1)int done = 0;while(!done){if(REDIS_ERR == redisBufferWrite(this->_subcribe_context, &done)){cerr<<"subscribe channel failed!"<<endl;return false;}}return true;
}//向redis指定的通道unsubscribe取消订阅消息(用户下线,无需订阅)
bool  Redis::unsubscribe(int channel){if(REDIS_ERR == redisAppendCommand(this->_subcribe_context, "UNSUBSCRIBE %d", channel)){cerr<<"subscribe channel failed!"<<endl;return false;}//redisBufferWrite可以循环发送缓冲区,直到缓冲区数据发送完毕(done被置为1)int done = 0;while(!done){if(REDIS_ERR == redisBufferWrite(this->_subcribe_context, &done)){cerr<<"subscribe channel failed!"<<endl;return false;}}return true;
}//在独立线程中接收订阅通道中的消息
void Redis::observer_channel_message(){redisReply* reply = nullptr;//以循环阻塞的方式等待通道发生的消息while(REDIS_OK == redisGetReply(this->_subcribe_context, (void**)&reply)){//订阅收到的消息是一个redisReply对象,根据不同的消息类型进行处理//订阅收到的消息是一个带三元素的数组if(reply != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr){//回调通知上层应用,收到订阅的消息_notify_message_handler(atoi(reply->element[1]->str), reply->element[2]->str);}//释放redisReply对象freeReplyObject(reply);}cerr<<"exit observer_channel_message"<<endl;
}//初始化向业务层上报通道消息的回调对象
void Redis::init_notify_handler(function<void(int, string)> fn)
{this->_notify_message_handler = fn;
}/*同步redis发布-订阅封装的代码主要提供以下方法:
1.connect:连接redis服务器生成了两个redisContext对象,一个用于发布消息(_publish_context),一个用于订阅通道(_subcribe_context)。
2.publish:在_publish_context上发布消息,id+msg,阻塞接收redis server的响应。
3.subscribe:在_subcribe_context上订阅通道,id,不接收redis server的响应。
4.unsubscribe:在_subcribe_context上取消订阅通道,id,不接收redis server的响应。
5.由于订阅/取消订阅接收响应都是阻塞型的,所以单独开辟线程thread:通过observer_channel_message函数调用redisGetReply循环阻塞方式接收订阅通道中的消息,回调通知上层应用(id+msg)。
*/

在业务层面,由单机模式变为集群模式,因此逻辑需要发生变化:

        1. 登录成功后,向Redis订阅channel(id)

//id用户登录成功后,向redis订阅channel(id)_redis.subscribe(id)tuichu

         2. 退出登录或者客户端异常退出后,取消订阅Redis channel

//用户下线,取消订阅redis channel_redis.unsubscribe(user.getId());

          3. 一对一聊天或者群聊业务时:

(1)先检查本服务器是否存在目标用户(检查userConnMap),若存在则直接转发消息

(2)如果userConnMap没有找到,则检查User数据库用户是否在线,若在线则说明在其他服务器,需要向该channel(id)发布消息

(3)若不在线,则直接写入离线消息数据库即可

//群组聊天业务
void ChatService::groupChat(const TcpConnectionPtr &conn,json &js,Timestamp time)
{int userid = js["id"].get<int>();int groupid = js["groupid"].get<int>();//获取群组中所有其他用户idvector<int> useridVec = _groupModel.queryGroupUsers(userid,groupid);//加锁,防止_userConnMap中的用户在发送消息时候上线或者下线,C++中map的操作本身是无法保证线程安全的lock_guard<mutex> lock(_connMutex);for(int id:useridVec){auto it = _userConnMap.find(id);//用户在线,并且在本台服务器,直接转发消息if(it!=_userConnMap.end()){it->second->send(js.dump());}//用户在其他服务器上或者不在线else{//查询其他用户是否在线,查询数据库User user = _userModel.query(id);if(user.getState()=="online"){//用户在线,但是不在本服务器上,转发消息到redis消息队列_redis.publish(id,js.dump());}else{//存储离线消息_offlineMsgModel.insert(id,js.dump());}}}
}

Redis连接后开辟单独的线程监听订阅的消息,当收到发布的消息后回调通知上层的应用。上层服务器收到通知后执行回调操作进行消息转发。

redis.cpp 监听通道中的消息:

//在独立线程中接收订阅通道中的消息
void Redis::observer_channel_message(){redisReply* reply = nullptr;//以循环阻塞的方式等待通道发生的消息while(REDIS_OK == redisGetReply(this->_subcribe_context, (void**)&reply)){//订阅收到的消息是一个redisReply对象,根据不同的消息类型进行处理//订阅收到的消息是一个带三元素的数组if(reply != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr){//回调通知上层应用,收到订阅的消息_notify_message_handler(atoi(reply->element[1]->str), reply->element[2]->str);}//释放redisReply对象freeReplyObject(reply);}cerr<<"exit observer_channel_message"<<endl;
}

chatservice.cpp 执行回调操作,转发通道消息:

//连接redis服务器
if(_redis.connect()){//设置上报消息的回调_redis.init_notify_handler(std::bind(&ChatService::handleRedisSubscribeMessage,this,_1,_2));
}//从redis消息队列中获取订阅的消息,将msg转发给对应的userid
void ChatService::handleRedisSubscribeMessage(int userid,string msg)
{lock_guard<mutex> lock(_connMutex);auto it = _userConnMap.find(userid);if(it!=_userConnMap.end()){it->second->send(msg);return;}//存储离线消息,这里主要考虑在上报和调用回调过程中用户突然下线的情况_offlineMsgModel.insert(userid,msg);
}

测试结果:

两台客户端分别连接两台服务器,并且可以实现通信。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/71260.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

掌握SQLite_轻量级数据库的全面指南

1. 引言 1.1 SQLite简介 SQLite 是一个嵌入式关系型数据库管理系统,它不需要单独的服务器进程或系统配置。它的设计目标是简单、高效、可靠,适用于各种应用场景,尤其是移动设备和嵌入式系统。 1.2 为什么选择SQLite 轻量级:文件大小通常在几百KB到几MB之间。无服务器架构…

基于javaweb的SpringBootoa办公自动化系统设计和实现(源码+文档+部署讲解)

&#x1f3ac; 秋野酱&#xff1a;《个人主页》 &#x1f525; 个人专栏:《Java专栏》《Python专栏》 ⛺️心若有所向往,何惧道阻且长 文章目录 运行环境开发工具适用功能说明部分代码展示 运行环境 Java≥8、MySQL≥5.7 开发工具 eclipse/idea/myeclipse/sts等均可配置运行…

LCD屏控制:你需要掌握的理论基础

目录 一、LCD介绍 1. 发展历程 2. 核心优势 3. 主要缺点 二、LCD屏幕工作原理 1. 核心结构 2. 工作原理 三、LCD屏分类 1. 信号类型划分 2. 材质分类 3. 接口类型分类 四、LCD屏常用接口 1. TTL(RGB)接口 2. LVDS接口 3. EDP接口 4. MIPI接口 &#xff08;1&a…

Audio-Visual Speech Enhancement(视听语音增强)领域近三年研究进展与国内团队及手机厂商动态分析

一、视听语音增强领域近三年研究进展 多模态融合与模型轻量化 多模态特征融合:中国科学技术大学团队提出通过引入超声舌头图像和唇部视频的联合建模,结合知识蒸馏技术,在训练阶段利用教师模型传递舌部运动知识,从而在推断时仅依赖唇部视频即可提升语音增强效果。此外,中科…

Hermite 插值

Hermite 插值 不少实际问题不但要求在节点上函数值相等&#xff0c;而且还要求它的导数值相等&#xff0c;甚至要求高阶导数值也相等。满足这种要求的插值多项式就是 Hermite 插值多项式。 下面只讨论函数值与导数值个数相等的情况。设在节点 a ≤ x 0 < x 1 < ⋯ <…

大语言模型简史:从Transformer(2017)到DeepSeek-R1(2025)的进化之路

2025年初&#xff0c;中国推出了具有开创性且高性价比的「大型语言模型」&#xff08;Large Language Model — LLM&#xff09;DeepSeek-R1&#xff0c;引发了AI的巨大变革。本文回顾了LLM的发展历程&#xff0c;起点是2017年革命性的Transformer架构&#xff0c;该架构通过「…

嵌入式AI(2)清华大学DeepSeek 01:从入门到精通

嵌入式AI(2)清华大学DeepSeek 01&#xff1a;从入门到精通

项目版本号生成

需求 项目想要生成一个更新版本号&#xff0c;格式为v2.0.20250101。 其中v2.0为版本号&#xff0c;更新时进行配置&#xff1b;20250101为更新日期&#xff0c;版本更新时自动生成。 实现思路 创建一个配置文件version.properties&#xff0c;在其中配置版本号&#xff1b…

c# —— StringBuilder 类

StringBuilder 类是 C# 和其他一些基于 .NET Framework 的编程语言中的一个类&#xff0c;它位于 System.Text 命名空间下。StringBuilder 类表示一个可变的字符序列&#xff0c;它是为了提供一种比直接使用字符串连接操作更加高效的方式来构建或修改字符串。 与 C# 中的 stri…

数据守护者:备份文件的重要性及自动化备份实践

在信息化社会&#xff0c;数据已成为企业运营和个人生活的重要组成部分。无论是企业的核心业务数据&#xff0c;还是个人的珍贵照片、重要文档&#xff0c;数据的丢失或损坏都可能带来无法估量的损失。因此&#xff0c;备份文件的重要性愈发凸显&#xff0c;它不仅是数据安全的…

ScoreFlow:通过基于分数的偏好优化掌握 LLM 智体工作流程

25年2月来自 U of Chicago、Princeton U 和 U of Oxford 的论文“ScoreFlow: Mastering LLM Agent Workflows via Score-based Preference Optimization”。 最近的研究利用大语言模型多智体系统来解决复杂问题&#xff0c;同时试图减少构建它们所需的手动工作量&#xff0c;从…

数值分析与科学计算导引——误差与算法举例

文章目录 第一章 数值分析与科学计算导引1.1 数值分析的对象、作用与特点数值分析的对象数值分析的作用数值分析的特点 1.2 数值计算的误差误差分类误差与有效数字数值运算的误差估计 1.3 算法举例秦九韶算法求多项式值开根号迭代算法牛顿切线加权平均的松弛技术 第一章 数值分…

【在时光的棋局中修行——论股市投资的诗意哲学】

在时光的棋局中修行——论股市投资的诗意哲学 引子&#xff1a;数字之海与星辰之约 在经纬交织的K线图里&#xff0c;我常看见银河倾泻的轨迹。那些跳动的数字如同繁星坠落&#xff0c;在午夜时分编织着财富的密码。炒股之道&#xff0c;是理性与诗意的交响&#xff0c;是数据…

线上项目报错OOM常见原因、排查方式、解决方案

概述 OutOfMemoryError&#xff08;OOM&#xff09;是 Java 应用程序中常见的问题&#xff0c;通常是由于应用程序占用的内存超过了 JVM 分配的最大内存限制。在 Spring Boot 项目中&#xff0c;OOM 问题可能由多种原因引起。 1. OOM 的常见原因 OOM 通常由以下几种情况引起&…

java练习(27)

ps&#xff1a;练习来自力扣 删除排序链表中的重复元素 给定一个已排序的链表的头 head &#xff0c; 删除所有重复的元素&#xff0c;使每个元素只出现一次 。返回 已排序的链表 。 代码来自官方题解 class Solution {public ListNode deleteDuplicates(ListNode head) {//…

Flutter:动态表单(在不确定字段的情况下,生成动态表单)

关于数据模型&#xff1a;模型就是一种规范约束&#xff0c;便于维护管理&#xff0c;在不确定表单内会出现什么数据时&#xff0c;就没有模型一说。 这时就要用到动态表单&#xff08;根据接口返回的字段&#xff0c;生成动态表单&#xff09; 1、观察数据格式&#xff0c;定义…

洛谷P8707 [蓝桥杯 2020 省 AB1] 走方格

#include <iostream> using namespace std; int f[31][31]; int main(){int n,m;scanf("%d%d",&n,&m);f[1][1]1;//边界&#xff1a;f(1,1)1for(int i1;i<n;i)for(int j1;j<m;j)if((i&1||j&1)&&(i!1||j!1))//i,j不均为偶数&#…

ASP.NET Core Web应用(.NET9.0)读取数据库表记录并显示到页面

1.创建ASP.NET Core Web应用 选择.NET9.0框架 安装SqlClient依赖包 2.实现数据库记录读取: 引用数据库操作类命名空间 创建查询记录结构类 查询数据并返回数据集合 3.前端遍历数据并动态生成表格显示 生成结果:

解决 Linux 中搜狗输入法导致系统崩溃的问题【fcitx 】【ibus】

在 Linux 系统中安装搜狗输入法时&#xff0c;有时会遇到一些令人头疼的问题。最近&#xff0c;我在安装搜狗输入法后&#xff0c;系统出现了崩溃的情况。具体表现为输入密码登录后&#xff0c;界面卡死&#xff0c;无法正常进入系统。经过一番排查和分析&#xff0c;我终于找到…

如何做好项目变更管理

项目变更管理是确保项目按时、按预算和按质量要求完成的关键环节之一。有效的项目变更管理包括&#xff1a;变更识别、变更评审、变更批准和变更实施。这些步骤确保项目在面对变化时能够高效应对&#xff0c;避免资源浪费、时间延误和预算超支。其中&#xff0c;变更评审和变更…