服务器架构:libevent + 线程池
数据库:MySQL
有两张表:chat_user和chat_group,分别保存用户信息和群信息
在线用户和群的保存:
struct User
{std::string name;//账号(用户名struct bufferevent* bev;//客户端对应的事件
};//保存在线用户信息
std::list<User> *online_user;
//保存所有群的信息
std::map<std::string,std::list<std::string>> *group_info;
ChatServer的初始化
ChatServer::ChatServer()
{//初始化事件集合this->base =event_base_new();//初始化数据库对象db = new DataBase();//初始化数据库表(chat_user,chat_group)if(!db->database_init_table()){std::cout<<"init table failure"<<std::endl;exit(1);}//初始化数据结构对象info = new ChatInfo();//初始化群信息:把群信息从数据库里读出来,放入mapserver_update_group_info();//初始化线程池thread_num = 3;cur_thread = 0;pool = new ChatThread[thread_num];for(int i=0;i<thread_num;i++){pool[i].start(info,db);}
}
初始化事件集合,放入监听事件
ChatServer::ChatServer()
{this->base =event_base_new();
}
ChatServer::listen()
下载libevent源码并查看:
进入/xx/usr/share/doc/libevent-dev/examples,可查看libevent使用示例。
查看hello-world.c
去头文件查看evconnlistener_new_bind的用法。
头文件都在/usr/include/中。
进入listener.h
struct evconnlistener *evconnlistener_new_bind(struct event_base *base,//事件集合evconnlistener_cb cb,//一旦有客户端连接,就会触发回调函数void *ptr,unsigned flags,int backlog,//监听队列里的容量const struct sockaddr *sa,int socklen
);
unsigned flags:说明里flags Any number of LEV_OPT_*
flags
于是搜索LEV_OPT_
,得如下结果
写代码:
//创建监听对象
void ChatServer::listen(const char* ip, int port)
{struct sockaddr_in server_info;memset(&server_info,0,sizeof(server_info));//清空server_info.sin_family = AF_INET;server_info.sin_addr.s_addr = inet_addr(ip);server_info.sin_port = htons(port);struct evconnlistener *listener=evconnlistener_new_bind(base,listener_cb,this,LEV_OPT_CLOSE_ON_FREE,5,(struct sockaddr*)&server_info,sizeof(server_info));if(listener == NULL){std::cout<<"evconnlistener_new_bind error"<<std::endl;exit(1);}//监听集合event_base_dispatch(base);//死循环,如果集合没有事件,退出//释放对象evconnlistener_free(listener);event_base_free(base);
}
查看回调函数listenner_cb
的声明:
把光标放在这,按下shift+8(也就是*),然后next
listener_cb
在回调函数中打印客户端的ip和端口,方便调试。
关于listener_cb的this参数的说明
struct evconnlistener *listener=evconnlistener_new_bind(base,listener_cb, this ,LEV_OPT_CLOSE_ON_FREE,5,(struct sockaddr*)&server_info,sizeof(server_info));
因为listener_cb是个静态函数,而静态函数只能通过对象来调用普通成员函数,所以listener_cb不能直接调用server_alloc_enevt()。
所以传入this参数,通过this来调用。(因为参数是void*,所以记得先把this强转回来)
//回调函数,有客户端发起连接,会触发该函数
void ChatServer::listener_cb(struct evconnlistener *listener, evutil_socket_t fd,struct sockaddr *c, int socklen, void *arg)
{struct sockaddr_in *client_info = (struct sockaddr_in*)c;std::cout<<"[connection]";std::cout<<" client ip : " <<inet_ntoa(client_info->sin_addr);std::cout<<" port : " << client_info->sin_port<<std::endl;//创建事件,放入线程池ser->server_alloc_event(fd);
}
初始化数据库对象
查找头文件
包含头文件要把include后面都写上
#include<mysql/mysql.h>
class DataBase
{
private:MYSQL *mysql;std::mutex _mutex;
public:DataBase();~DataBase();bool database_connect();void database_disconnect();bool database_init_table();
}
构造函数不需要做什么。
DataBase::DataBase()
{
}
初始化数据库表
是否要一直打开数据库——取决于项目对数据库的使用频繁程度。
远程通信系统对数据库使用不频繁,所以不用时将数据库关闭。
连接数据库:
在命令行敲的所有命令都可以通过mysql_query()来执行
mysql_query()
如果查询成功,返回0。如果出现错误,返回非0值。
建议先在在mysql中测试语句的可执行性,再写入代码:
mysql -u root -p
登录mysql
set names utf8;
显示query 成功。
bool DataBase::database_connect()
{//初始化数据库句柄mysql = mysql_init(NULL);//分配堆内存//连接数据库mysql = mysql_real_connect(mysql, "localhost","root","root","chat_database",0,NULL,0);if(mysql==NULL){std::cout<<"mysql_real_connect error"<<std::endl;return false;}//设置编码格式 (防止中文乱码)if(mysql_query(mysql, "set names utf8;")!=0){std::cout<<"set names utf8 error"<<std::endl;return false;}return true;
}
断开数据库:
直接调用mysql_close()
函数
void DataBase::database_disconnect()
{mysql_close(mysql);
}
初始化数据库表:
创建chat_group的sql语句:
create table if not exists chat_group(
groupname varchar(128),
groupowner varchar(128),
groupmember varchar(4096)
)charset utf8;
创建chat_user的sql语句
create table if not exists chat_user(
username varchar(128),
password varchar(128),
friendlist varchar(4096),
grouplist varchar(4096)
)charset utf8;
初始化数据库表:
bool DataBase::database_init_table()
{database_connect();const char* g="create table if not exists chat_group(groupname varchar(128),groupowner varchar(128),groupmember varchar(4096))charset utf8;";if(mysql_query(mysql,g)!=0){return false;}const char* u="create table if not exists chat_user(username varchar(128),password varchar(128),friendlist varchar(4096),grouplist varchar(4096))charset utf8;";if(mysql_query(mysql,u)!=0){return false;}database_disconnect();return true;
}
初始化数据结构对象
struct User
{std::string name;//账号(用户名struct bufferevent* bev;//客户端对应的事件
};class ChatInfo
{
private://保存在线用户信息std::list<User> *online_user;//保存所有群的信息std::map<std::string,std::list<std::string>> *group_info;//访问在线用户的锁std::mutex list_mutex;//访问群信息的锁std::mutex map_mutex;
public:ChatInfo();~ChatInfo();void list_update_group(std::string* ,int);void list_print_group();
};
ChatInfo::ChatInfo()
{online_user = new std::list<User>;group_info = new std::map<std::string,std::list<std::string>>;
}
初始化群信息
server_update_group_info
逻辑:从数据库获取群信息,然后写入list。
所以获取群信息的函数是属于DataBase的,写入list的函数时属于ChatInfo的
void ChatServer::server_update_group_info()
{//连接数据库if(!db->database_connect()){exit(1);}std::string groupinfo[1024];//最多1024个群int num = db->database_get_group_info(groupinfo);std::cout<<"group num : "<<num<<std::endl;//断开数据库db->database_disconnect();info->list_update_group(groupinfo,num);//info->list_print_group();//测试用
}
database_get_group_info
mysql_store_result()
用于从服务器获取结果集并将其存储在客户端中以供检索和处理,返回值是MYSQL_RES*
。
mysql_fetch_row()
用于逐行获取查询结果集中的数据,返回值是MYSQL_ROW
。
MYSQL_ROW是个数组
用竖线|
间隔每个数据。
int DataBase::database_get_group_info(std::string *g)
{if(mysql_query(mysql,"select * from chat_group;")!=0){std::cout<<"select error"<<std::endl;return -1;}MYSQL_RES *res = mysql_store_result(mysql);if(res==NULL){std::cout<<"store result error"<<std::endl;return -1;}MYSQL_ROW r;int idx=0;while(r = mysql_fetch_row(res)){g[idx] += r[0];g[idx] +='|';g[idx] += r[2];//std::cout<<g[idx]<<std::endl;idx++;}mysql_free_result(res);return idx;
}
list_update_group()
按database_get_group_info中的格式,查找竖线|
,已得到每个数据。
void ChatInfo::list_update_group(std::string* g, int size)
{int idx=0,start =0;std::string groupname,membername;std::list<std::string> l;for(int i=0;i<size;i++){idx = g[i].find('|');groupname = g[i].substr(0,idx);//std::cout<<groupname<<std::endl;start = idx +1;while(1)//idx查找竖线,找不到是-1{idx = g[i].find('|',idx+1); //从idx开始查找if(idx==-1)break;membername = g[i].substr(start,idx-start);l.push_back(membername);start = idx +1;}membername = g[i].substr(start,idx - start);l.push_back(membername);this->group_info->insert(std::pair<std::string,std::list<std::string>>(groupname,l));l.clear();}
}
初始化线程池
class ChatThread
{
private:std::thread *_thread;std::thread::id _id;struct event_base *base;ChatInfo *info;DataBase *db;
public:ChatThread();~ChatThread();void start(ChatInfo *,DataBase *);void run();static void worker(ChatThread*);};
ChatThread()
ChatThread::ChatThread()
{_thread = new std::thread(worker,this);_id = _thread->get_id();//get_id()时线程标准库里的base = event_base_new();
}
回调函数worker
因为静态成员函数worker只能访问静态成员变量,不能访问普通成员变量,却可以通过对象调用普通成员函数,所以再写一个普通成员函数run,同时在构造时传入this参数以调用run。
void ChatThread::worker(ChatThread *t)
{t->run();
}
run()
因为event_base_dispatch()
当集合中无事件时自动退出,所以随便放一个事件进集合。
比如放一个定时器事件,查找示例:
查看main函数中的使用示例
写代码:
void ChatThread::run()
{//集合中放入一个定时器事件struct event timeout;struct timeval tv;//将事件与集合绑定//base是构造函数初始化的base//EV_PERSIST表示定时器永远都有用event_assign(&timeout, base, -1, EV_PERSIST, timeout_cb,this);evutil_timerclear(&tv);tv.tv_sec=3;event_add(&timeout,&tv);std::cout<<"--- thread "<<_id<<" start working ---"<<std::endl;event_base_dispatch(base);//死循环,当集合中没有事件的时候退出event_base_free(base);}
void ChatThread::timeout_cb(evutil_socket_t fd, short event, void *arg)
{ChatThread *t=(ChatThread *)arg;//std::cout<<"-- thread "<<t->thread_get_id()<<" is listening --"<<std::endl;
}