接着上文,[集群聊天服务器]----(十)Nginx的tcp负载均衡配置–附带截图,我们配置nginx,使用了多台服务端来提高单机的并发量,接下来我们回到项目中,思考一下,各个服务端之间怎么进行通信呢?
配置Nginx以后,怎么保证跨服务器通信呢?
使用集群服务器,有多个服务器维护自己的用户列表。ChatServer1与ChatServer2的用户聊天,ChatServer1在自己服务器的用户表中找不到,但是可能用户在线,所以我们需要保证跨服务器间的通信!
但是如果让后端的服务器之间互相连接,让各个ChatServer服务器互相之间直接建立TCP连接进行通信,相当于在服务器网络之间进行广播。ChatServer维护了一个连接的用户表,每次向别的用户发消息都会从用户表中查看对端用户是否在线。然后再判断是直接发送,还是转为离线消息。这样的设计使得各个服务器之间耦合度太高 ,不利于系统扩展,并且会占用系统大量的socket资源,各服务器之间的带宽压力很大,不能够节省资源给更多的客户端提供服务,并且存在一个服务器瘫痪其余都崩溃的情况,不采用。
所以引入中间件消息队列,解耦各个服务器, 使整个系统松耦合,提高服务器的响应能力,节省服务器的带宽资源,在集群分布式环境中,经常使用的中间件消息队列有ActiveMQ、RabbitMQ、Kafka等,都是应用场景广泛并且性能很好的消息队列,供集群服务器之间,分布式服务之间进行消息通信。但是本项目业务类型并不是非常复杂,对并发请求量也没有太高的要求,因此我们的中间件消息队列选型的是-基于发布-订阅模式的redis。有关于redis的安装,在我的另一篇博客中有详细的介绍,Linux下安装redis并配置开机自启保姆级教程-----附带每一步截图
Redis发布-订阅
Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。
Redis 客户端可以订阅任意数量的通道。当有新消息通过 publish 命令发送给通道 时, 这个消息就会被发送给订阅它的客户端。
需要注意的是:这里的subscribe是以阻塞的形式等待publish端发送消息的,publish是一有消息就发送的。
实现
重要成员变量
// hiredis同步上下文对象,负责publish消息
redisContext *_publish_context;// hiredis同步上下文对象,负责subscribe消息
redisContext *_subcribe_context;// 回调操作,收到订阅的消息,给service层上报
function<void(int, string)> _notify_message_handler;
- redisContext为redis提供的类
重要成员函数
Redis();
~Redis();// 连接redis服务器
bool connect();// 向redis指定的通道channel发布消息
bool publish(int channel, string message);// 向redis指定的通道subscribe订阅消息
bool subscribe(int channel);// 向redis指定的通道unsubscribe取消订阅消息
bool unsubscribe(int channel);// 在独立线程中接收订阅通道中的消息
void observer_channel_message();// 初始化向业务层上报通道消息的回调对象
void init_notify_handler(function<void(int, string)> fn);
构造与析构函数
Redis::Redis(): _publish_context(nullptr), _subcribe_context(nullptr)
{
}Redis::~Redis()
{//释放资源if (_publish_context != nullptr){redisFree(_publish_context);}if (_subcribe_context != nullptr){redisFree(_subcribe_context);}
}
- 构造与析构函数重要完成对两个对象的初始化以及释放资源
连接函数
bool Redis::connect()
{_publish_context = redisConnect("127.0.0.1", 6379);if (nullptr == _publish_context){cerr << "connect redis failed!" << endl;return false;}// 负责subscribe订阅消息的上下文连接_subcribe_context = redisConnect("127.0.0.1", 6379);if (nullptr == _subcribe_context){cerr << "connect redis failed!" << endl;return false;}// 在单独的线程中,监听通道上的事件,有消息给业务层进行上报thread t([&](){ observer_channel_message(); });t.detach();cout << "connect redis-server success!" << endl;return true;
}
_publish_context
负责publish发布消息的上下文连接 6379 是 redis-server 监听的端口号_subcribe_context
负责subscribe订阅消息的上下文连接- 在单独的线程中,监听通道上的事件,有消息给业务层进行上报
发布消息
bool Redis::publish(int channel, string message)
{redisReply *reply = (redisReply *)redisCommand(_publish_context, "PUBLISH %d %s", channel, message.c_str());if (nullptr == reply){cerr << "publish command failed!" << endl;return false;}freeReplyObject(reply); //释放return true;
}
- 主要完成向redis指定的通道channel发布消息
- 值得注意的是: redisCommand相当于在redis中敲了一个命令 通道号和消息,先把要发送的命令 缓存到本地 调用了redisAppendCommand,然后调用了redisBufferWrite 把命令发送到redis-server上,最后调用redisGetReply 阻塞等待redis server响应消息,publish一执行马上就回复了,所以可以使用redisCommand
- 注意释放资源
订阅消息
bool Redis::subscribe(int channel)
{if (REDIS_ERR == redisAppendCommand(this->_subcribe_context, "SUBSCRIBE %d", channel)){cerr << "subscribe command failed!" << endl;return false;}// redisBufferWrite可以循环发送缓冲区,直到缓冲区数据发送完毕(done被置为1)int done = 0;while (!done){if (REDIS_ERR == redisBufferWrite(this->_subcribe_context, &done)){cerr << "subscribe command failed!" << endl;return false;}}// redisGetReplyreturn true;
}
- 主要完成向redis指定的通道subscribe订阅消息
- 值得注意的是: 订阅消息不会向发布消息一样使用
redisCommand
命令。因为subscribe命令本身会造成线程阻塞等待通道里面发生消息,这里只做订阅通道,不接收通道消息,通道消息的接收专门在observer_channel_message函数中的独立线程中进行,只负责发送命令,不阻塞接收redis server响应消息,否则和notifyMsg线程抢占响应资源
取消订阅
bool Redis::unsubscribe(int channel)
{if (REDIS_ERR == redisAppendCommand(this->_subcribe_context, "UNSUBSCRIBE %d", channel)){cerr << "unsubscribe command failed!" << endl;return false;}// redisBufferWrite可以循环发送缓冲区,直到缓冲区数据发送完毕(done被置为1)int done = 0;while (!done){if (REDIS_ERR == redisBufferWrite(this->_subcribe_context, &done)){cerr << "unsubscribe command failed!" << endl;return false;}}return true;
}
- 主要完成向redis指定的通道unsubscribe取消订阅消息
在独立线程中接收订阅通道中的消息
void Redis::observer_channel_message()
{redisReply *reply = nullptr;while (REDIS_OK == redisGetReply(this->_subcribe_context, (void **)&reply)){if (reply != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr){// 给业务层上报通道上发生的消息_notify_message_handler(atoi(reply->element[1]->str), reply->element[2]->str);}freeReplyObject(reply);}cerr << ">>>>>>>>>>>>> observer_channel_message quit <<<<<<<<<<<<<" << endl;
}
- 订阅收到的消息是一个带三元素的数组 element[2] 就是消息 element[1] 通道号
初始化向业务层上报通道消息的回调对象
void Redis::init_notify_handler(function<void(int, string)> fn)
{this->_notify_message_handler = fn;
}
怎么在项目中使用呢?
在前面的剖析中,我们多多少少也看到了redis的身影,主要是在业务模块使用了它,下面我们在具体看一下在那些部分使用到了redis。
- 在ChatService类中,首先我们创建了一个redis的操作对象
Redis _redis;
- 在用户登录成功以后,我们向redis订阅了通道,这里使用id作为通道号
_redis.subscribe(id);
- 然后创建了一个函数从redis消息队列中获取订阅的消息
void handleRedisSubscribeMessage(int, string);
- 利用这个操作对象,我们连接了服务器,并设置了上报消息的回调
if(_redis.connect())
{_redis.init_notify_handler(std::bind(&ChatService::handleRedisSubscribeMessage, this, _1,_2));
}
- 在查询到用户在线,但是不在同一个服务端的时候,我们就会调用redis的回调函数
if (user.getState() == "online"){_redis.publish(toid, js.dump());return;}
- 具体实现如下:
void ChatService::handleRedisSubscribeMessage(int userid, string msg)
{lock_guard<mutex> lock(_connMutex);auto it = _userConnMap.find(userid);if (it != _userConnMap.end()){it->second->send(msg);return;}// 存储该用户的离线消息_offlineMsgModel.insert(userid, msg);
}
- 根据userid寻找用户是否存在,存在就发送消息,不存在就存储他的离线消息
- 用户注销,在redis中取消订阅通道
_redis.unsubscribe(userid);
具体的使用就这么多,实现起来还是很简单的,完全足够本项目的开发。
项目测试
剖析到这里,整个项目就完结撒花了,接下来我们来做一个简单的测试,这里再次给出源码地址,在readme中,给出了详细的编译步骤,也给出了一键编译脚本,感兴趣的伙伴们可以拉下来试试。
-
编译结束以后,我们启动两个服务端6000 6002 在nginx配置的两个 。
-
然后开启三个客户端,记得打开8000端口
-
此时客户端,分配给了两个服务端
-
进入一个终端,我们查看表里的内容
-
注册三个用户
-
表中数据,1 2是之前创建过的
-
登录
-
一对一聊天 3向4聊天,4不在线
-
查看离线消息
-
添加好友 3添加4
-
查看好友列表
-
1创建群
-
4登录,显示离线消息
-
加入群
-
查看表
-
5登录,加入群
-
群聊天
-
3与4聊天
-
3客户端退出
-
服务端退出【ctrl+c】