前文实现了单服务器与多客户端之间的通信以及聊天业务,同时为了增大并发量利用nginx实现多服务器的集群负载均衡,但是一个关键的问题是要实现多服务器之间的通信,这里采用Redis的发布订阅消息队列实现。
不同客户端可能连接在不同服务器上,服务器可以向Redis发起订阅和发布请求,当有消息需要发送时以发布的形式写入Redis消息队列,当有接收的消息时,在订阅的前提下Redis负责将消息通知给服务器。
同步redis发布-订阅封装的代码主要提供以下方法:
1.connect:连接redis服务器生成了两个redisContext对象,一个用于发布消息(_publish_context),一个用于订阅通道(_subcribe_context)。
2.publish:在_publish_context上发布消息,id+msg,阻塞接收redis server的响应。
3.subscribe:在_subcribe_context上订阅通道,id,不接收redis server的响应。
4.unsubscribe:在_subcribe_context上取消订阅通道,id,不接收redis server的响应。
5.由于订阅/取消订阅接收响应都是阻塞型的,所以单独开辟线程thread:
通过observer_channel_message函数调用redisGetReply循环阻塞方式接收订阅通道中的消息,回调通知上层应用(id+msg)。
实现源代码如下:
#include"redis.hpp"
#include<iostream>
#include<thread>
using namespace std;//构造函数,将订阅和发布的两个对象指针置空
Redis::Redis():_publish_context(nullptr),_subcribe_context(nullptr)
{}
//析构函数,释放资源
Redis::~Redis(){if(_publish_context!=nullptr){redisFree(_publish_context);}if(_subcribe_context!=nullptr){redisFree(_subcribe_context);}
}
//连接redis服务器
bool Redis::connect(){//两个对象连接redis服务器,redis默认ip+port=127.0.0.1:6379_publish_context = redisConnect("127.0.0.1", 6379);if(nullptr == _publish_context){cerr<<"connect redis failed!"<<endl;return false;}_subcribe_context = redisConnect("127.0.0.1", 6379);if(nullptr == _subcribe_context){cerr<<"connect redis failed!"<<endl;return false;}//在单独的线程中,监听通道上的事件,有消息给业务层进行上报//因为上报和订阅都是阻塞的,需要单独开辟一个线程进行消息上报,否则服务器无法进行其他业务thread t([&](){observer_channel_message();});t.detach();cout<<"connect redis-server success!"<<endl;return true;
}//向redis指定的通道channel发布消息
bool Redis::publish(int channel, string message){//相当于命令行 publish channel message//redisCommand是同步操作,阻塞,相当于redisAppendCommand+redisBufferWrite+redisGetReply,pubilsh是一致性马上回复,所以可以阻塞等待//redisAppendCommand是将命令组装好后放到本地缓存//redisBufferWrite是将本地缓存的命令发送到redis服务器//redisGetReply是从redis服务器获取返回的结果(阻塞型)redisReply* reply = (redisReply*)redisCommand(this->_publish_context, "PUBLISH %d %s", channel, message.c_str());if(nullptr == reply){cerr<<"publish message failed!"<<endl;return false;}freeReplyObject(reply);return true;
}
//向redis指定的通道subscribe订阅消息
bool Redis::subscribe(int channel){//subscribe命令本身会造成线程阻塞等待通道里发生消息,这里之作订阅通道,不接受通道消息//通道消息的接收专门在observer_channel_message函数中的独立线程中进行//只负责发送命令,不阻塞接收redis server响应消息,否则和notifyMsg线程抢占响应资源if(REDIS_ERR == redisAppendCommand(this->_subcribe_context, "SUBSCRIBE %d", channel)){cerr<<"subscribe channel failed!"<<endl;return false;}//redisBufferWrite可以循环发送缓冲区,直到缓冲区数据发送完毕(done被置为1)int done = 0;while(!done){if(REDIS_ERR == redisBufferWrite(this->_subcribe_context, &done)){cerr<<"subscribe channel failed!"<<endl;return false;}}return true;
}//向redis指定的通道unsubscribe取消订阅消息(用户下线,无需订阅)
bool Redis::unsubscribe(int channel){if(REDIS_ERR == redisAppendCommand(this->_subcribe_context, "UNSUBSCRIBE %d", channel)){cerr<<"subscribe channel failed!"<<endl;return false;}//redisBufferWrite可以循环发送缓冲区,直到缓冲区数据发送完毕(done被置为1)int done = 0;while(!done){if(REDIS_ERR == redisBufferWrite(this->_subcribe_context, &done)){cerr<<"subscribe channel failed!"<<endl;return false;}}return true;
}//在独立线程中接收订阅通道中的消息
void Redis::observer_channel_message(){redisReply* reply = nullptr;//以循环阻塞的方式等待通道发生的消息while(REDIS_OK == redisGetReply(this->_subcribe_context, (void**)&reply)){//订阅收到的消息是一个redisReply对象,根据不同的消息类型进行处理//订阅收到的消息是一个带三元素的数组if(reply != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr){//回调通知上层应用,收到订阅的消息_notify_message_handler(atoi(reply->element[1]->str), reply->element[2]->str);}//释放redisReply对象freeReplyObject(reply);}cerr<<"exit observer_channel_message"<<endl;
}//初始化向业务层上报通道消息的回调对象
void Redis::init_notify_handler(function<void(int, string)> fn)
{this->_notify_message_handler = fn;
}/*同步redis发布-订阅封装的代码主要提供以下方法:
1.connect:连接redis服务器生成了两个redisContext对象,一个用于发布消息(_publish_context),一个用于订阅通道(_subcribe_context)。
2.publish:在_publish_context上发布消息,id+msg,阻塞接收redis server的响应。
3.subscribe:在_subcribe_context上订阅通道,id,不接收redis server的响应。
4.unsubscribe:在_subcribe_context上取消订阅通道,id,不接收redis server的响应。
5.由于订阅/取消订阅接收响应都是阻塞型的,所以单独开辟线程thread:通过observer_channel_message函数调用redisGetReply循环阻塞方式接收订阅通道中的消息,回调通知上层应用(id+msg)。
*/
在业务层面,由单机模式变为集群模式,因此逻辑需要发生变化:
1. 登录成功后,向Redis订阅channel(id)
//id用户登录成功后,向redis订阅channel(id)_redis.subscribe(id)tuichu
2. 退出登录或者客户端异常退出后,取消订阅Redis channel
//用户下线,取消订阅redis channel_redis.unsubscribe(user.getId());
3. 一对一聊天或者群聊业务时:
(1)先检查本服务器是否存在目标用户(检查userConnMap),若存在则直接转发消息
(2)如果userConnMap没有找到,则检查User数据库用户是否在线,若在线则说明在其他服务器,需要向该channel(id)发布消息
(3)若不在线,则直接写入离线消息数据库即可
//群组聊天业务
void ChatService::groupChat(const TcpConnectionPtr &conn,json &js,Timestamp time)
{int userid = js["id"].get<int>();int groupid = js["groupid"].get<int>();//获取群组中所有其他用户idvector<int> useridVec = _groupModel.queryGroupUsers(userid,groupid);//加锁,防止_userConnMap中的用户在发送消息时候上线或者下线,C++中map的操作本身是无法保证线程安全的lock_guard<mutex> lock(_connMutex);for(int id:useridVec){auto it = _userConnMap.find(id);//用户在线,并且在本台服务器,直接转发消息if(it!=_userConnMap.end()){it->second->send(js.dump());}//用户在其他服务器上或者不在线else{//查询其他用户是否在线,查询数据库User user = _userModel.query(id);if(user.getState()=="online"){//用户在线,但是不在本服务器上,转发消息到redis消息队列_redis.publish(id,js.dump());}else{//存储离线消息_offlineMsgModel.insert(id,js.dump());}}}
}
Redis连接后开辟单独的线程监听订阅的消息,当收到发布的消息后回调通知上层的应用。上层服务器收到通知后执行回调操作进行消息转发。
redis.cpp 监听通道中的消息:
//在独立线程中接收订阅通道中的消息
void Redis::observer_channel_message(){redisReply* reply = nullptr;//以循环阻塞的方式等待通道发生的消息while(REDIS_OK == redisGetReply(this->_subcribe_context, (void**)&reply)){//订阅收到的消息是一个redisReply对象,根据不同的消息类型进行处理//订阅收到的消息是一个带三元素的数组if(reply != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr){//回调通知上层应用,收到订阅的消息_notify_message_handler(atoi(reply->element[1]->str), reply->element[2]->str);}//释放redisReply对象freeReplyObject(reply);}cerr<<"exit observer_channel_message"<<endl;
}
chatservice.cpp 执行回调操作,转发通道消息:
//连接redis服务器
if(_redis.connect()){//设置上报消息的回调_redis.init_notify_handler(std::bind(&ChatService::handleRedisSubscribeMessage,this,_1,_2));
}//从redis消息队列中获取订阅的消息,将msg转发给对应的userid
void ChatService::handleRedisSubscribeMessage(int userid,string msg)
{lock_guard<mutex> lock(_connMutex);auto it = _userConnMap.find(userid);if(it!=_userConnMap.end()){it->second->send(msg);return;}//存储离线消息,这里主要考虑在上报和调用回调过程中用户突然下线的情况_offlineMsgModel.insert(userid,msg);
}
测试结果:
两台客户端分别连接两台服务器,并且可以实现通信。