[集群聊天服务器]----(十一) 使用Redis实现发布订阅功能

接着上文,[集群聊天服务器]----(十)Nginx的tcp负载均衡配置–附带截图,我们配置nginx,使用了多台服务端来提高单机的并发量,接下来我们回到项目中,思考一下,各个服务端之间怎么进行通信呢?

配置Nginx以后,怎么保证跨服务器通信呢?

使用集群服务器,有多个服务器维护自己的用户列表。ChatServer1与ChatServer2的用户聊天,ChatServer1在自己服务器的用户表中找不到,但是可能用户在线,所以我们需要保证跨服务器间的通信!
但是如果让后端的服务器之间互相连接,让各个ChatServer服务器互相之间直接建立TCP连接进行通信,相当于在服务器网络之间进行广播。ChatServer维护了一个连接的用户表,每次向别的用户发消息都会从用户表中查看对端用户是否在线。然后再判断是直接发送,还是转为离线消息。这样的设计使得各个服务器之间耦合度太高 ,不利于系统扩展,并且会占用系统大量的socket资源,各服务器之间的带宽压力很大,不能够节省资源给更多的客户端提供服务,并且存在一个服务器瘫痪其余都崩溃的情况,不采用
在这里插入图片描述
所以引入中间件消息队列,解耦各个服务器, 使整个系统松耦合,提高服务器的响应能力,节省服务器的带宽资源,在集群分布式环境中,经常使用的中间件消息队列有ActiveMQ、RabbitMQ、Kafka等,都是应用场景广泛并且性能很好的消息队列,供集群服务器之间,分布式服务之间进行消息通信。但是本项目业务类型并不是非常复杂,对并发请求量也没有太高的要求,因此我们的中间件消息队列选型的是-基于发布-订阅模式的redis。有关于redis的安装,在我的另一篇博客中有详细的介绍,Linux下安装redis并配置开机自启保姆级教程-----附带每一步截图
在这里插入图片描述

Redis发布-订阅

Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。
Redis 客户端可以订阅任意数量的通道。当有新消息通过 publish 命令发送给通道 时, 这个消息就会被发送给订阅它的客户端。
需要注意的是:这里的subscribe是以阻塞的形式等待publish端发送消息的,publish是一有消息就发送的。

实现

重要成员变量

// hiredis同步上下文对象,负责publish消息
redisContext *_publish_context;// hiredis同步上下文对象,负责subscribe消息
redisContext *_subcribe_context;// 回调操作,收到订阅的消息,给service层上报
function<void(int, string)> _notify_message_handler;
  • redisContext为redis提供的类

重要成员函数

Redis();
~Redis();// 连接redis服务器 
bool connect();// 向redis指定的通道channel发布消息
bool publish(int channel, string message);// 向redis指定的通道subscribe订阅消息
bool subscribe(int channel);// 向redis指定的通道unsubscribe取消订阅消息
bool unsubscribe(int channel);// 在独立线程中接收订阅通道中的消息
void observer_channel_message();// 初始化向业务层上报通道消息的回调对象
void init_notify_handler(function<void(int, string)> fn);

构造与析构函数

Redis::Redis(): _publish_context(nullptr), _subcribe_context(nullptr)
{
}Redis::~Redis()
{//释放资源if (_publish_context != nullptr){redisFree(_publish_context);}if (_subcribe_context != nullptr){redisFree(_subcribe_context);}
}
  • 构造与析构函数重要完成对两个对象的初始化以及释放资源

连接函数

bool Redis::connect()
{_publish_context = redisConnect("127.0.0.1", 6379);if (nullptr == _publish_context){cerr << "connect redis failed!" << endl;return false;}// 负责subscribe订阅消息的上下文连接_subcribe_context = redisConnect("127.0.0.1", 6379);if (nullptr == _subcribe_context){cerr << "connect redis failed!" << endl;return false;}// 在单独的线程中,监听通道上的事件,有消息给业务层进行上报thread t([&](){ observer_channel_message(); });t.detach();cout << "connect redis-server success!" << endl;return true;
}
  • _publish_context 负责publish发布消息的上下文连接 6379 是 redis-server 监听的端口号
  • _subcribe_context负责subscribe订阅消息的上下文连接
  • 在单独的线程中,监听通道上的事件,有消息给业务层进行上报

发布消息

bool Redis::publish(int channel, string message)
{redisReply *reply = (redisReply *)redisCommand(_publish_context, "PUBLISH %d %s", channel, message.c_str());if (nullptr == reply){cerr << "publish command failed!" << endl;return false;}freeReplyObject(reply); //释放return true;
}
  • 主要完成向redis指定的通道channel发布消息
  • 值得注意的是: redisCommand相当于在redis中敲了一个命令 通道号和消息,先把要发送的命令 缓存到本地 调用了redisAppendCommand,然后调用了redisBufferWrite 把命令发送到redis-server上,最后调用redisGetReply 阻塞等待redis server响应消息,publish一执行马上就回复了,所以可以使用redisCommand
  • 注意释放资源

订阅消息

bool Redis::subscribe(int channel)
{if (REDIS_ERR == redisAppendCommand(this->_subcribe_context, "SUBSCRIBE %d", channel)){cerr << "subscribe command failed!" << endl;return false;}// redisBufferWrite可以循环发送缓冲区,直到缓冲区数据发送完毕(done被置为1)int done = 0;while (!done){if (REDIS_ERR == redisBufferWrite(this->_subcribe_context, &done)){cerr << "subscribe command failed!" << endl;return false;}}// redisGetReplyreturn true;
}
  • 主要完成向redis指定的通道subscribe订阅消息
  • 值得注意的是: 订阅消息不会向发布消息一样使用redisCommand命令。因为subscribe命令本身会造成线程阻塞等待通道里面发生消息,这里只做订阅通道,不接收通道消息,通道消息的接收专门在observer_channel_message函数中的独立线程中进行,只负责发送命令,不阻塞接收redis server响应消息,否则和notifyMsg线程抢占响应资源

取消订阅

bool Redis::unsubscribe(int channel)
{if (REDIS_ERR == redisAppendCommand(this->_subcribe_context, "UNSUBSCRIBE %d", channel)){cerr << "unsubscribe command failed!" << endl;return false;}// redisBufferWrite可以循环发送缓冲区,直到缓冲区数据发送完毕(done被置为1)int done = 0;while (!done){if (REDIS_ERR == redisBufferWrite(this->_subcribe_context, &done)){cerr << "unsubscribe command failed!" << endl;return false;}}return true;
}
  • 主要完成向redis指定的通道unsubscribe取消订阅消息

在独立线程中接收订阅通道中的消息

void Redis::observer_channel_message()
{redisReply *reply = nullptr;while (REDIS_OK == redisGetReply(this->_subcribe_context, (void **)&reply)){if (reply != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr){// 给业务层上报通道上发生的消息_notify_message_handler(atoi(reply->element[1]->str), reply->element[2]->str);}freeReplyObject(reply);}cerr << ">>>>>>>>>>>>> observer_channel_message quit <<<<<<<<<<<<<" << endl;
}
  • 订阅收到的消息是一个带三元素的数组 element[2] 就是消息 element[1] 通道号

初始化向业务层上报通道消息的回调对象

void Redis::init_notify_handler(function<void(int, string)> fn)
{this->_notify_message_handler = fn;
}

怎么在项目中使用呢?

在前面的剖析中,我们多多少少也看到了redis的身影,主要是在业务模块使用了它,下面我们在具体看一下在那些部分使用到了redis。

  • 在ChatService类中,首先我们创建了一个redis的操作对象
Redis _redis;
  • 在用户登录成功以后,我们向redis订阅了通道,这里使用id作为通道号
_redis.subscribe(id);
  • 然后创建了一个函数从redis消息队列中获取订阅的消息
void handleRedisSubscribeMessage(int, string);
  • 利用这个操作对象,我们连接了服务器,并设置了上报消息的回调
if(_redis.connect())
{_redis.init_notify_handler(std::bind(&ChatService::handleRedisSubscribeMessage, this, _1,_2));
}
  • 在查询到用户在线,但是不在同一个服务端的时候,我们就会调用redis的回调函数
if (user.getState() == "online"){_redis.publish(toid, js.dump());return;}
  • 具体实现如下:
void ChatService::handleRedisSubscribeMessage(int userid, string msg)
{lock_guard<mutex> lock(_connMutex);auto it = _userConnMap.find(userid);if (it != _userConnMap.end()){it->second->send(msg);return;}// 存储该用户的离线消息_offlineMsgModel.insert(userid, msg);
}
  • 根据userid寻找用户是否存在,存在就发送消息,不存在就存储他的离线消息
  • 用户注销,在redis中取消订阅通道
 _redis.unsubscribe(userid);

具体的使用就这么多,实现起来还是很简单的,完全足够本项目的开发。

项目测试

剖析到这里,整个项目就完结撒花了,接下来我们来做一个简单的测试,这里再次给出源码地址,在readme中,给出了详细的编译步骤,也给出了一键编译脚本,感兴趣的伙伴们可以拉下来试试。
在这里插入图片描述

  • 编译结束以后,我们启动两个服务端6000 6002 在nginx配置的两个
    在这里插入图片描述

  • 然后开启三个客户端,记得打开8000端口
    在这里插入图片描述
    在这里插入图片描述

  • 此时客户端,分配给了两个服务端
    在这里插入图片描述
    在这里插入图片描述

  • 进入一个终端,我们查看表里的内容
    在这里插入图片描述

  • 注册三个用户
    在这里插入图片描述

  • 表中数据,1 2是之前创建过的
    在这里插入图片描述

  • 登录
    在这里插入图片描述

  • 一对一聊天 3向4聊天,4不在线
    在这里插入图片描述

  • 查看离线消息
    在这里插入图片描述

  • 添加好友 3添加4
    在这里插入图片描述

  • 查看好友列表
    在这里插入图片描述

  • 1创建群
    在这里插入图片描述
    在这里插入图片描述

  • 4登录,显示离线消息
    在这里插入图片描述

  • 加入群
    在这里插入图片描述

  • 查看表
    在这里插入图片描述

  • 5登录,加入群

  • 群聊天
    在这里插入图片描述

  • 3与4聊天
    在这里插入图片描述

  • 3客户端退出
    在这里插入图片描述
    在这里插入图片描述

  • 服务端退出【ctrl+c】
    在这里插入图片描述

好了~ 关于集群聊天服务器的剖析就到此结束了,希望能够帮助到大家,也希望路过的大佬看到问题可以指出,感谢大家的支持,完结撒花~

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/17416.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Reactor模式Proactor模式

1.Reactor/Dispatcher模式 1.1 概述 Reactor模式下&#xff0c;服务端的构成为Reactor 处理资源池。其中&#xff0c;Reactor负责监听和分发事件&#xff0c;而处理资源池则负责处理事件。 该模式下的组合方案有下面几种(第三种几乎没有被实际应用)&#xff1a; 1 * Reacto…

文件上传漏洞:pikachu靶场中的文件上传漏洞通关

目录 1、文件上传漏洞介绍 2、pikachu-client check 3、pikachu-MIME type 4、pikachu-getimagesize 最近在学习文件上传漏洞&#xff0c;这里使用pikachu靶场来对文件上传漏洞进行一个复习练习 废话不多说&#xff0c;开整 1、文件上传漏洞介绍 pikachu靶场是这样介绍文…

APM2.8下载固件的方法(两种办法详解)

1.把APM飞控用安卓手机的USB线插入电脑。 选择COM口&#xff0c;不要选择auto&#xff0c;如果你没有COM口说明你驱动安装有问题。 波特率115200。点击相应的图标就可以下载固件到飞控板。 请注意&#xff1a;烧录APM必须选择INSTALL FIRMWARE LEAGACY,第一个是用于刷pixhawk的…

【软件设计师】网络安全

1.网络安全基础信息 网络安全的五个基本要素&#xff1a; 机密性&#xff1a;确保信息不暴露给未授权的实体或进程 完整性&#xff1a;只有得到允许的人才能修改数据&#xff0c;并且能判断出数据是否已被修改 可用性&#xff1a;得到授权的实体在需要时可以访问数据&#xff0…

Laravel和ThinkPHP框架比较

一、开发体验与易用性比较 1. 代码可读性&#xff1a; - Laravel以其优雅的语法和良好的代码结构著称&#xff0c;使得代码更加易读易懂。 - 相比之下&#xff0c;ThinkPHP的代码可读性较为一般&#xff0c;在一些复杂业务场景下&#xff0c;可能会稍显混乱。 让您能够一站式…

【动手学PaddleX】谁都能学会的基于迁移学习的老人摔倒目标检测

本项目使用PaddleX搭建目标检测模块&#xff0c;在一个精选的数据集上进行初步训练&#xff0c;并在另一个老年人跌倒检测的数据集上进行参数微调&#xff0c;实现了迁移学习的目标检测项目。 1.项目介绍 迁移学习是非常有用的方法&#xff0c;在实际生活中由于场景多样&…

Brewer Science将在CS Mantech进行展示

在风景如画的亚利桑那州图森市举办的CS Mantech盛会上&#xff08;2024年5月20日至23日&#xff09;&#xff0c;杰出化合物半导体材料企业Brewer Science&#xff0c;将带来一场名为“化合物半导体制造的创新材料解决方案”的演讲盛宴。这一演讲&#xff0c;定于五月二十一日星…

【Java面试】五、MySQL篇(下)

文章目录 1、事务的特性2、并发事务问题3、事务的隔离级别4、undo log 和 redo log4.1 底层结构4.2 redo log4.3 undo log 5、MVCC5.1 隐式字段5.2 undo log 版本链5.3 ReadView5.4 ReadView的匹配规则实现事务隔离 6、MySQL的主从同步原理7、分库分表7.1 垂直分库7.2 垂直分表…

stm32启动文件

启动文件由汇编编写&#xff0c;是系统上电复位后第一个执行的程序。主要做了以下工作&#xff1a; 初始化堆栈指针SP_initial_sp 初始化PC指针Reset_Handler 初始化中断向量表 配置系统时钟 调用C库函数_main初始化用户堆栈&#xff0c;从而最终调用main函数去到C的世界 …

linux下使用cmake-gui编译WXQT

一.编译环境 操作系统&#xff1a;Ubuntu 22.04.3 LTS wxWidgets源码&#xff1a;wxWidgets-3.1.5 编译工具&#xff1a;CMake-gui qt版本&#xff1a;5.13.2 二.编译步骤 1.将源码解压。 2.打开CMake-gui&#xff0c;并设置好源码目录和构建目录 3.点击configure 会弹出…

C++模板使用

文章目录 目录 文章目录 前言 一、交换函数(泛型编程) 二、函数模板 2.1 函数模板概念 2.2函数模板格式 2.3使用方法 2.4 函数模板的原理 2.4.1库中的swap 2.5 函数模板的实例化 2.6 模板参数的匹配原则 三、类模板 3.1 类模板的定义格式 3.2类模板声明和定义分离 前言 C语言阶…

数据仓库——分层原理

目录 一、什么是数据仓库 二、数仓建模的意义&#xff0c;为什么要对数据仓库分层&#xff1f; 三、ETL 四、技术架构 五、数仓分层架构 数仓逻辑分层 1、数据引入层&#xff08;ODS&#xff0c;Operational Data Store&#xff0c;又称数据基础层&#xff09;&#xff…

解决 WooCommerce 的分析报表失效问题

今天明月的一个境外电商客户反应网站的 WooCommerce 分析报表已经十多天没有更新了&#xff0c;明明每天都有订单交易可分析报表里的数据依旧是十多天前的&#xff0c;好像更新完全停滞了似的。明月也及时的查看了后台的所有设置&#xff0c;确认没有任何问题&#xff0c;WooCo…

Android刮刮卡自定义控件

效果图 刮刮卡自定义控件 import android.content.Context; import android.graphics.Bitmap; import android.graphics.Canvas; import android.graphics.Color; import android.graphics.Paint; import android.graphics.Path; import android.graphics.PorterDuff; import …

类和对象03

六、继承 我们发现&#xff0c;定义这些类时&#xff0c;下级别的成员除了拥有上一级的共性&#xff0c;还有自己的特性。 这个时候我们就可以考虑利用继承的技术&#xff0c;减少重复代码 6.1 继承的基础语法 例如我们看到很多网站中, 都有公共的头部&#xff0c;公共的底…

乡村振兴的乡村人才引进与培养:引进和培养乡村人才,激发乡村发展活力,为乡村振兴提供人才保障

目录 一、引言 二、乡村人才引进与培养的重要性 &#xff08;一&#xff09;人才是乡村振兴的核心动力 &#xff08;二&#xff09;人才是乡村文化传承的载体 &#xff08;三&#xff09;人才是乡村社会治理的基石 三、乡村人才引进与培养的现状 &#xff08;一&#xf…

备战秋招c++ 【持续更新】

T1 牛牛的快递 原题链接&#xff1a;牛牛的快递_牛客题霸_牛客网 (nowcoder.com) 题目类型&#xff1a;模拟 审题&确定思路&#xff1a; 1、超过1kg和不足1kg有两种不同收费方案 ---- 起步价问题 2、超出部分不足1kg的按1kg计算 ----- 向上取整 3、向上取整的实现思路…

移动端应用订阅SDK接入攻略

本文档介绍了联想应用联运移动端订阅SDK接入操作指南&#xff0c;您可在了解文档内容后&#xff0c;自行接入应用联运移动端订阅SDK。 接入前准备 1请先与联想商务达成合作意向。 2.联系联想运营&#xff0c;提供应用和公司信息&#xff0c;并获取商户id、app id、key&#…

谷歌开发者账号身份验证不通过?该怎么办?

我们都清楚&#xff0c;随着谷歌上架行业的快速发展&#xff0c;谷歌政策也在不断更新变化&#xff0c;对开发者账号的审核标准也在不断提升。其中一项要求就是&#xff0c;开发者账号需要进行身份验证才能发布应用。 Your identity couldnt be verified&#xff01;“我们无法…

词法与语法分析器介绍

概述 词法和语法可以使用正则表达式和BNF范式表达&#xff0c;而最终描述文法含义的事状态转换图 Lex与YACC 词法分析器Lex 词法分析词Lex&#xff0c;是一种生成词法分析的工具&#xff0c;描述器是识别文本中词汇模式的程序&#xff0c;这些词汇模式是在特殊的句子结构中…