文章目录
- 前置知识
- 关键技术点
- 项目背景
- 连接池功能点介绍
- MySQL Server参数介绍
- 功能设计
- 连接池功能点介绍
- 开发平台选型
- 关于MySQL数据库编程
- MySQL接口介绍
- 测试表设计
- Connection设计
- 数据库配置文件mysql.conf
- 日志文件log.hpp
- ConnectionPool设计
- 压力测试
- 源码链接:
前置知识
关键技术点
MySQL数据库编程、单例模式、queue队列容器、C++11多线程编程、线程互斥、线程同步通信和unique_lock
、基于CAS的原子整形、智能指针shared_ptr、lambda表达式、生产者-消费者线程模型
项目背景
MySQL是一个基于C/S设计的关系型数据库管理系统,一条SQL的执行需要通过mysql client发起一个连接,经过TCP三次握手完成TCP连接,然后再对客户端进行身份验证,验证成功后再把SQL发给mysql server(RDBMS)执行SQL(一般会涉及磁盘IO),然后给客户端返回执行结果,执行结束后,进行四次挥手断开连接
如果想要提高MySQL数据库(基于C/S设计)的访问瓶颈:
- 在服务器端增加缓存服务器,用户缓存常用的数据(例如redis),减少磁盘IO的次数
- 还可以使用连接池,来提高MySQL Server的访问效率
在高并发情况下:大量的TCP三次握手建立MySQL Server连接认证 以及 TCP四次挥手 MySQL Server关闭连接回收资源所耗费的性能时间也是很明显的,使用连接池就是为了减少这一部分的性能损耗
连接池功能点介绍
该项目是基于C++语言实现的连接池,主要也是实现几个所有连接池都支持的通用基础功能,连接池一般包含了以下内容:
- 数据库连接所用的ip地址
- port端口号
- 用户名和密码,连接哪个库
- 以及连接池的性能参数,例如 初始连接量,最大连接量,最大空闲时间、连接超时时间等,
1.初始连接量( i n i t S i z e initSize initSize):表示连接池会预先和MySQL Server创建 i n i t S i z e initSize initSize个连接,当用户发起MySQL访问时,不需要创建新的连接,直接从连接池中获取一个可用的连接就可以,使用完成后,并不去释放该连接,而是把当前连接重新归还到连接池当中
2.最大连接量( m a x S i z e maxSize maxSize):当并发访问MySQL Server的请求增多时,初始连接量已经不够使用的时候,此时会根据新的请求数量去创建更多的连接给用户去使用,但是新创建的连接数量上限是 m a x S i z e maxSize maxSize,不能无限制的创建连接。
- 因为每一个连接都会占用一个 s o c k e t socket socket资源,一般连接池和服务器程序是部署在一台主机上的,如果连接池占用过多的 s o c k e t socket socket资源,那么服务器就不能接收太多的客户端请求了。当这些连接使用完成后,再次归还到连接池当中来维护
3.最大空闲时间( m a x I d l e T i m e maxIdleTime maxIdleTime):当访问MySQL Server的并发请求多了以后,连接池里面的连接数量会动态增加,上限是 m a x S i z e maxSize maxSize个,当这些连接用完再次归还到连接池当中。如果在指定的 m a x I d l e T i m e maxIdleTime maxIdleTime里面,这些新增加的连接都没有被再次使用过,那么新增加的这些连接资源就要被回收掉,只需要保持初始连接量 i n i t S i z e initSize initSize个连接就可以了
4.连接超时时间( c o n n e c t i o n T i m e o u t connectionTimeout connectionTimeout):当MySQL Server的并发请求量过大,连接池中的连接数量已经到达 m a x S i z e maxSize maxSize了,而此时没有空闲的连接可供使用,那么此时用户无法从连接池获取连接,它通过"阻塞"的方式获取连接的时间如果超过 c o n n e c t i o n T i m e o u t connectionTimeout connectionTimeout,那么获取连接失败,无法访问数据库
MySQL Server参数介绍
show variables like 'max_connections'; #查看MySQL Server所支持的最大连接个数
超过 m a x _ c o n n e c t i o n s max\_connections max_connections数量的连接,MySQL Server会直接拒绝访问,所以在使用连接池增加连接数量的时候,MySQL Server的 m a x c o n n e c t i o n s max_connections maxconnections参数也要适当的进行调整,以适配连接池的连接上限
功能设计
文件 | 功能 |
---|---|
$ConnectionPool.cpp $ 和 C o n n e c t i o n P o o l . h ConnectionPool.h ConnectionPool.h | 实现连接池 |
C o n n e c t i o n . c p p Connection.cpp Connection.cpp 和 C o n n e c t i o n . h Connection.h Connection.h | 描述每一条建立的连接:数据库操作代码、增删改查代码实现 |
连接池功能点介绍
1.连接池只需要一个实例,所以采用单例模式进行设计
2.服务器一般是多线程,可能多个线程都发起了对这个Mysql Server的操作请求,都需要从队列当中获取连接空闲连接,而 C o n n e c t i o n Connection Connection全部维护在一个连接队列中 => 所以需要保证连接队列的线程安全,需要使用互斥锁保证队列的线程安全
3.如果连接队列为空,还需要再获取连接,此时需要动态创建连接,上限数量是 m a x S i z e maxSize maxSize
4.连接队列中空闲连接时间超过 m a x I d l e T i m e maxIdleTime maxIdleTime的就要被释放掉,只保留初始的 i n i t S i z e initSize initSize个连接就可以了,这个功能点肯定需要放在独立的线程中去做
5.如果连接队列为空,而此时连接的数量已达上限 m a x S i z e maxSize maxSize,客户**“阻塞”**等待 c o n n e c t i o n T i m e o u t connectionTimeout connectionTimeout时间之后还没有获取到空闲的连接,那么获取连接失败
- 此处"阻塞"从Connection队列获取空闲连接,可以使用带超时时间的mutex互斥锁来实现连接超时时间
- 假设连接超时时间为100ms => 并不是直接睡眠100ms,而是在这100ms时间内,不断判断是否连接队列当中是否有空闲连接,如果有,那么直接获取队头的连接,否则连接池队列为空,就获取失败
6.用户获取的连接需要用shared_ptr
智能指针来管理,需要定制删除器 定制 连接释放的功能(此处并不是真正释放连接,而是把连接归还到连接池中)
7.新连接的生产线程 和 获取连接的线程 采用生产者-消费者线程模型来设计,所以需要使用线程间的同步通信机制来保证 => 条件变量和互斥锁
具体流程
假设我们的服务器给客户提供服务,客户端发起请求需要数据库操作时,Server需要到连接池管理的队列中获取一个连接,然后连接池给Server返回一个智能指针维护的连接,Server只管使用这条连接,无需关心这条连接的释放,然后使用这条连接去访问MySQL Server
开发平台选型
有关MySQL数据库编程、多线程编程、线程互斥和同步通信操作、智能指针、设计模式、容器等等这些技术在C++语言层面都可以直接实现,因此该项目选择直接在windows平台上进行开发
当然,因为采用的都是语言级别的接口,没有强依赖系统的接口,所以跨平台性比较好:放在Linux平台下用g++也可以直接编译运行,但是需要解决一些库的依赖问题
关于MySQL数据库编程
MySQL的windows安装文件云盘地址如下(下载development开发版:包含mysql头文件和libmysql库文件):
链接:https://pan.baidu.com/s/1Y1l7qvpdR2clW5OCdOTwrQ 提取码:95de
MySQL数据库编程直接采用oracle公司提供的MySQL C/C++客户端开发包,在VS上需要进行相应的头文件和库文件的配置,如下
- 1.右键项目 - C/C++ - 常规 - 附加包含目录,填写下载好的mysql.h头文件在当前电脑的路径
- 2.右键项目 - 链接器 - 常规 - 附加库目录,填写libmysql.lib的路径
- 3.右键项目 - 链接器 - 输入 - 附加依赖项,填写libmysql.lib库的名字
- 4.把 l i b m y s q l . d l l libmysql.dll libmysql.dll动态链接库(Linux下后缀名是.so库)放在工程目录下
坑点:如果MySQL装的是64位版本的,所以动态库什么的都是64位生成的,所以项目先选成64位的
MySQL接口介绍
初始化:
mysql_init()
,
要使用库,必须先进行初始化 由此也可以看出MySQL其实是网络服务,句柄就是文件描述符
MYSQL *my = mysql_init(nullptr);
链接数据库
mysql_real_connect
() :
初始化完毕之后,必须先链接数据库,再进行后续操作,因为mysql网络部分是基于TCP/IP的
MYSQL *mysql_real_connect(MYSQL *mysql, const char *host, const char *user, const char *passwd,const char *db,unsigned int port,const char *unix_socket, unsigned long clientflag);
- 第一个参数 :mysql_init的返回值,句柄
- 第二个参数:主机地址
- 第三个参数:用户名
- 第四个参数:登录密码
- 第五个参数:要链接的数据库
- 第六个参数:端口号
- 第七个参数:为null时,表明不使用socket或管道机制
- 第八个参数:通常设置为0
返回值:连接成功,返回MYSQL*连接句柄。如果连接失败,返回NULL
更改编码格式
建立好链接之后,获取英文没有问题,如果获取中文是乱码,因为原始默认字符集是latin1,我们要手动设置链接的默认字符集是utf8
mysql_set_character_set(my, "utf8"); //第一个参数是mysql_init的返回值
下发mysql命令
mysql_query()
int mysql_query(MYSQL *mysql, const char *q);
- 第一个参数:mysql_init的返回值
- 第二个参数:要执行的sql语句,不需要加
;
作为sql语句的结尾
返回值:如果是0:表示执行成功,非0:表示执行失败
注意:sql执行完以后,如果是select查询,还要读取数据,如果update,insert等语句只需要看下函数是否执行成功
查询select的结果:
MYSQL_RES *mysql_store_result(MYSQL *mysql);
依靠句柄获取最新一次查询结果,返回到MYSQL_RES
结构体中,同时该函数malloc了一块内存空间来存储查询过来的数据,所以一定要释放对应空间不然会造成内存泄漏
- 获取结果行数
mysql_num_rows
my_ulonglong mysql_num_rows(MYSQL_RES *res);
- 获取结果列数
mysql_num_fifields
unsigned int mysql_num_fields(MYSQL_RES *res);
- 获取列名(字段名称)
mysql_fetch_fifields
MYSQL_FIELD *mysql_fetch_fields(MYSQL_RES *res);
- 获取结果内容
mysql_fetch_row
(获取一行完整的记录)
MYSQL_ROW mysql_fetch_row(MYSQL_RES *result); //MYSQL_ROW本质就是char**类型,当成一个二维数组来用
关闭mysql链接
mysql_close
void mysql_close(MYSQL *sock);
使用例子
const std::string host = "175.178.18.99"; //主机号
const std::string user = "test";//用户
const std::string passwd = "C++Mango...";//密码
const std::string db = "db_test"; //要使用的库
const int port = 3306; //端口号int main()
{// 相当于给我们创建了一个mysql句柄MYSQL *my = mysql_init(nullptr); // 链接数据库if( nullptr == mysql_real_connect(my, host.c_str(), user.c_str(), passwd.c_str(), db.c_str(), port, nullptr, 0)){std::cerr << "mysql 连接失败!" << std::endl;return 1;}// 设置连接的编码也是utf8mysql_set_character_set(my, "utf8");std::cout << "mysql 连接成功!" << std::endl;//增删改 这个三个是最简单的,因为只要sql执行完毕,就完成了//插入的时候同样会受主键约束之类的//std::string sql = "insert into account values (3,\'赵六\',30 )";//std::string sql = "update account set blance = 300 where id=3";std::string sql = "delete from account where id=3";int r = mysql_query(my,sql.c_str());//执行成功 res为0,失败的话res非0if(r != 0){std::cout << "execute: " << sql.c_str() << " failed!" << std::endl;return 2;}// select语句执行完,只是第一步,还需要对数据进一步解析!// 1. 执行查询语句std::string select_sql = "select name from account"; //后序只需要更改这个查找,后面的逻辑都不需要变if (0 != mysql_query(my, select_sql.c_str())){std::cout << "execute: " << select_sql << " failed!" << std::endl;return 3;}std::cout << "execute: [" << select_sql << " ] success" << std::endl;// 2. 获取查询结果数据MYSQL_RES *res = mysql_store_result(my);//获取执行结果//3.解析数据 -- 获取行数和列数int rows = mysql_num_rows(res); // 获取行数int cols = mysql_num_fields(res);// 获取列数std::cout << "行数: " << rows << ", 列数: " << cols << std::endl;// 4.解析数据 -- 获取表中列名 -- 一般不用,仅仅是为了测试代码的完整性MYSQL_FIELD *fields = mysql_fetch_field(res);for (int i = 0; i < cols; i++){ std::cout << fields[i].name << "\t"; // 输出列名,通常没什么用}std::cout << std::endl;//5. 解析数据 -- 获取表中的数据 -- 重要for(int i = 0; i < rows; i++){//获取完整的一行记录[可能包含了多列]MYSQL_ROW line = mysql_fetch_row(res);//MYSQL_ROW本质就是二级指针char**for(int j = 0; j < cols; j++) {std::cout << line[j] << "\t";//将记录内部的多列字符串依次打印!}std::cout << std::endl;}//最后,关闭连接free(res); //不要忘记释放存储select查询结果的空间mysql_close(my);std::cout << "关闭mysql连接" << std::endl;return 0;
}
调试sql语句错误:
int mysql_errno(MYSQL*) 返回上次调用的MySQL函数的错误编号。
const char* mysql_error(MYSQL*) 返回上次调用的MySQL函数的错误消息。
测试表设计
Connection设计
描述每一条建立的连接:数据库操作代码、增删改查代码实现
主要成员:
- 1.与MySQL进行连接的句柄 _ c o n n e c t i o n \_connection _connection
- 2.记录处于空闲时间时候的此条连接的存活时间(放入连接队列的起始时间 _ a l i v e t i m e \_alivetime _alivetime)
- 当访问MySQL Server的并发请求多了以后,连接池里面的连接数量会动态增加,上限是 m a x S i z e maxSize maxSize个,当这些连接用完再次归还到连接池当中~~
- 如果在指定的空闲时间里面,这些新增加的连接都没有被再次使用过,那么新增加的这些连接资源就要被回收掉
成员函数:
1.构造函数:初始化数据库连接 => 本质是调用 m y s q l _ i n i t mysql\_init mysql_init创建MySQL连接
2.析构函数:释放数据库连接资源,本质是调用 m y s q l _ c l o s e mysql\_close mysql_close释放MySQL连接
3.连接数据库:需要传入:IP地址,端口号,用户名,密码,要连接的库名,本质是调用 m y s q l _ r e a l _ c o n n e c t mysql\_real\_connect mysql_real_connect函数
4.更新数据:主要是用于执行更新数据的sql语句,比如 i n s e r t , u p d a t e , d e l e t e insert,update,delete insert,update,delete等操作
5.查询数据:主要是用于执行 s e l e c t , d e s c select,desc select,desc等查询操作
6.返回当前这条连接的存活时间(空闲时间):用于定时清理空闲的连接,保证数据库访问不多的时候,连接池的连接个数保持在 i n i t S i z e initSize initSize
7.刷新这条连接的起始空闲时间:比如新创建一条连接放入到连接队列的时候需要更新当前的起始空闲时间 || 客户端操作完数据库之后,服务器将连接重新回收到连接队列当中 => 即:将连接放入到队列的时候需要调用该函数更新 _ a l i v e t i m e \_alivetime _alivetime时间
Connection.h文件
#include <mysql.h>
#include <string>
#include <ctime>
using namespace std;class Connection
{
public:Connection();// 初始化数据库连接 ~Connection();// 释放数据库连接资源 bool connect(string ip,unsigned short port,string user,string password,string dbname);// 连接数据库 bool update(string sql);// 执行sql更新操作 insert、delete、update bool query(string sql);//执行sql查询操作 select descvoid refreshAliveTime(); //刷新当前连接的起始空闲时间clock_t getAliveTime() const;//返回当前连接的空闲时间
private:MYSQL* _connection; //数据库连接的句柄,表示和MySQL Server的一条连接 clock_t _alivetime; //记录当前连接的空闲时间
};
前置知识
ctime库当中的clock
函数: clock_t clock(void);
作用: 返回从开始程序进程到调用clock()
之间的CPU时钟计时单元(clock tick)数。返回的单位是毫秒
Connection.cpp
Connection::Connection()
{_connection = mysql_init(nullptr);if (_connection == nullptr){LOG(WARMING, "MySQL连接建立失败");}
}Connection::~Connection()
{if (_connection != nullptr)mysql_close(_connection);
}void Connection::refreshAliveTime()
{_alivetime = clock();
}clock_t Connection::getAliveTime() const
{return clock() - _alivetime; //当前时间-处于空闲状态时候的起始时间 就是当前连接的存活时间
}bool Connection::connect(string ip, unsigned short port, string user, string password, string dbname)
{return nullptr != mysql_real_connect(_connection, ip.c_str(), user.c_str(), \password.c_str(), dbname.c_str(),port, nullptr, 0);
}bool Connection::update(string sql)
{if (mysql_query(_connection, sql.c_str()) != 0){LOG(ERROR, "sql语句执行失败" + sql);return false;}LOG(NORMAL, "sql语句执行成功" + sql);return true;
}bool Connection::query(string sql)
{if (mysql_query(_connection, sql.c_str()) != 0){LOG(ERROR, "sql语句执行失败" + sql);return false;}MYSQL_RES* res = mysql_store_result(_connection);//获取执行结果//解析数据 -- 获取行数和列数int rows = mysql_num_rows(res); // 获取行数int cols = mysql_num_fields(res);// 获取列数// 解析数据 -- 获取表中列名 -- 一般不用,仅仅是为了测试代码的完整性MYSQL_FIELD* fields = mysql_fetch_field(res);for (int i = 0; i < cols; i++){std::cout << fields[i].name << "\t"; // 输出列名,通常没什么用}std::cout << std::endl;//解析数据 -- 获取表中的数据 -- 重要for (int i = 0; i < rows; i++){//获取完整的一行记录[可能包含了多列]MYSQL_ROW line = mysql_fetch_row(res);//MYSQL_ROW本质就是二级指针char**for (int j = 0; j < cols; j++){std::cout << line[j] << "\t";//将记录内部的多列字符串依次打印!}std::cout << std::endl;}LOG(NORMAL, "SQL语句执行成功" + sql);return true;
}
数据库配置文件mysql.conf
将后续数据库连接池的配置内容写到一个文件当中,包括:
- IP地址,端口号,用户名,登录密码,要连接的数据库,初始连接量,最大连接量,最大空闲时间,连接超时时间
#数据库连接池的配置文件
ip=127.0.0.1
port=3306
username=用户名
password=登录密码
initSize=10
maxSize=1024
dbname=要连接的数据库
#最大空闲时间 默认为秒
maxIdleTime=60
#连接超时时间 单位是毫秒
connectionTimeOut=100
注意:每一项我们都是以 k e y = v a l u e key=value key=value的形式,以换行来进行区分,注意每一项的最后面不要有空格!
- 后续就可以一次读取一行,通过 = = =来区分,获取每一个配置项, 并且需要注意将末尾的KaTeX parse error: Undefined control sequence: \n at position 1: \̲n̲ 去掉
日志文件log.hpp
用于打印日志信息,主要包含:日志等级,在哪个文件输出的日志(方便问题定位),在第几行输出的日志,输出日志的时间,日志消息
#include<iostream>
using namespace std;enum
{NORMAL,WARNING,ERROR,FATAL
};#define LOG(level,message) cout <<"[" << #level << "] " <<"[" << __FILE__ << "] " \<<"[" << __LINE__ << "] " <<"[" << __TIMESTAMP__ << "] " <<"[" << message << "] "
ConnectionPool设计
成员设计:
- MySQL的IP地址,端口号,用户名,登陆密码,要连接的数据库名称
- 连接池的初始连接量,最大连接量,最大空闲时间(默认为 秒),从连接池等待获取连接的最大超时时间
- 连接队列:所有的连接都放到连接队列当中
- 互斥锁:由于可能有多个执行流同时访问连接队列,所以需要使用锁来保证连接队列的线程安全
- 一个原子类型的变量:记录当前创建连接的个数=》 i n i t S i z e < = c o u n t < = m a x S i z e initSize <= count <= maxSize initSize<=count<=maxSize
- 条件变量:用于协调 生产连接的线程 和 消费连接的线程的同步
成员函数设计:
1.构造函数:
- 从配置文件当中读取每一项配置项,然后初始化上述的部分成员
- 创建初始数量为 i n i t S i z e initSize initSize的连接
- 创建一条新连接=>连接数据库=>刷新该条连接的空闲时间=>将该条连接放入到连接队列当中 (由于此时是程序启动之前,连接池启动的时候,此时还没有其它线程操作该队列,所以暂时不用考虑线程安全问题) => 连接数量原子性++
- 启动一个生产连接线程:专门用于生产连接,当发现队列为空的时候就需要生产新连接,但是前提是:连接个数没有超过最大连接数
- 启动一个定时清理线程:用于扫描超过最大空闲时间的空闲连接,进行连接回收 => 将连接释放,让服务器压力不大的时候,连接队列的连接个数保持在 i n i t S i z e initSize initSize
2.提供一个静态函数获取全局唯一的连接池单例对象
- 这里采用基于局部静态变量的懒汉模式=>在C++11以后,局部静态变量的初始化是线程安全的,其底层是通过加锁保证安全
对于static静态局部变量的初始化,编译器会自动对它的初始化进行加锁和解锁控制,使静态局部变量的初始化成为线程安全的操作,不用担心多个线程都会初始化静态局部变量,因此上面的懒汉单例模式是线程安全的单例模式
3.连接生产线程函数:死循环不断检测:首先先进行加锁 => 判断连接队列是否为空=>如果不为空,那么就在条件变量下等待,如果连接队列为空 => 判断此时连接数量是否超过 m a x S i z e maxSize maxSize,如果连接数量没有到达上限,继续创建新的连接 => 通知消费者线程进行消费(从连接队列获取连接)
4.定时清理线程函数:扫描超过maxIdleTime时间的空闲连接,进行连接回收
- 死循环不断扫描=>通过sleep模拟定时效果=>先进行加锁=>判断当前连接数量是否超过起始连接数量,如果没超过就不处理=>拿出连接队列当中队头的连接,判断该连接的空闲时间是否超过最大空闲时间,如果超过了,那么就需要释放这条连接
注意:由于队列是先进先出,如果队头连接的空闲时间没有超过最大空闲时间 _ m a x I d l e T i m e \_maxIdleTime _maxIdleTime,那么后续的连接肯定没有超过,不需要继续处理
将线程函数写成类的成员方法:最大的好处就是可以非常方便访问当前对象的成员变量
5.给外部提供接口,从连接池中获取一个可用的空闲连接:
- 注意:不应该直接给外部返回该连接对应的地址(指针),如果要返回指针,那么外部用完之后,还需要专门提供一个函数把连接归还到连接池。只需要给外部返回一个管理该连接的智能指针即可,因为智能指针出作用域之后会自动析构,默认的析构方式是调用delete释放该连接,所以我们需要自定义删除器,不是把连接释放掉,而是把连接归还到连接池,通过智能指针自动管理外部用完这个连接的"释放"工作
- 步骤:进行加锁=>如果连接队列为空,那么消费者需要在条件变量下等待 最大超时时间 _ c o n n e c t i o n T i m e o u t 毫秒 最大超时时间\_connectionTimeout毫秒 最大超时时间_connectionTimeout毫秒(注意:并不是直接sleep睡眠,而是在等待超时时间以内,如果被唤醒之后能获取连接,那么就把连接拿走,否则获取失败)=>如果超时醒来,连接队列仍未空,获取连接失败=>否则获取连接成功,给用户返回队头的连接,并且通过智能指针自定义该连接的"释放"方式 => 消费完连接以后,通知生产者线程检查一下,如果队列为空了,赶紧生产连接
ConnectionPool.h
class ConnectionPool
{
public:static ConnectionPool* GetInstance(); //获取全局唯一的连接池对象bool LoadConfigFile();//读取配置文件bool CreateNewConnect();//创建新连接void produceConnectionTask(); //用于生产连接的线程函数void scanConnectionTask(); //用于定时清理空闲连接的线程函数private:ConnectionPool();~ConnectionPool();//单例模式:将构造私有化 拷贝构造和赋值重载禁用,防止外部拷贝生成对象ConnectionPool(const ConnectionPool&) = delete;ConnectionPool& operator=(const ConnectionPool&) = delete;private:string _ip; // mysql的ip地址unsigned short _port; //端口号 默认3306string _username; //用户名string _password; //登录密码string _dbname; //要连接的数据库名称int _initSize; //连接池的初始连接量int _maxSize; //连接池的最大连接量int _maxIdleTime; //连接池的最大空闲时间 int _connectionTimeout; //从连接池获取连接的最大超时时间queue<Connection*> _connectionQueue;//连接队列atomic<int> _connectionCount; //当前创建的连接数condition_variable _cv;//条件变量:用于生产连接的线程和消费连接的线程之间的通信
};
ConnectionPool.cpp
#define _CRT_SECURE_NO_WARNINGS 1
#pragma once
#include<thread>
#include<functional>
#include"ConnectionPool.h"ConnectionPool::ConnectionPool()
{//1.加载配置文件if (!LoadConfigFile()){return;}//2.创建个数为initSize个连接for (int i = 0; i < _initSize; i++){//由于此时是系统启动,连接池启动的时候,此时还没有其它线程操作该队列,所以不需要加锁保证该队列的线程安全CreateNewConnect();}LOG(NORMAL, "初始连接量创建成功");//3.创建一个专门负责生产连接的线程thread produceThread(&ConnectionPool::produceConnectionTask, this);//类的非静态成员函数的第一个参数为this指针produceThread.detach();//4.创建一个专门负责清理空闲连接的线程thread scanThread(&ConnectionPool::scanConnectionTask, this); scanThread.detach();LOG(NORMAL,"数据库连接池启动成功");
}ConnectionPool::~ConnectionPool()
{while (!_connectionQueue.empty()){Connection* front = _connectionQueue.front();_connectionQueue.pop();delete front;//调用Connection类的析构函数 =>释放连接资源front = nullptr;}
}ConnectionPool* ConnectionPool::GetInstance() //基于局部静态变量的懒汉模式
{static ConnectionPool inst;return &inst;
}bool ConnectionPool::CreateNewConnect() //用于生产连接的函数 => 需要将新连接放入到队列当中=>线程安全
{Connection* p = new Connection();if (p != nullptr){p->connect(_ip, _port, _username, _password, _dbname); //连接数据库_connectionQueue.push(p); //将该条连接插入到队列当中p->refreshAliveTime();//刷新当前连接的空闲时间_connectionCount++;//创建的连接个数++return true;}return false;
}//检测是否需要创建新连接的独立线程:当连接队列为空的时候就需要生产新连接,前提是:连接个数没有超过最大连接数
void ConnectionPool::produceConnectionTask()
{while (1){unique_lock<mutex> ul(_mtx);while (!_connectionQueue.empty()) //如果连接队列不为空,那么就在条件变量下等待{_cv.wait(ul);}//此时连接队列为空,判断是否需要生产新连接if (_connectionCount < _maxSize){CreateNewConnect();}_cv.notify_one(); //唤醒消费者(用户)消费该连接}
}//清理超过最大空闲时间连接的独立线程
void ConnectionPool::scanConnectionTask()
{while (1){this_thread::sleep_for(chrono::seconds(_maxIdleTime));//通过sleep模拟定时效果unique_lock<mutex> ul(_mtx);while (_connectionCount > _initSize) //当前连接数比起始连接数多 才需要进行清理{Connection* front = _connectionQueue.front();//先拿出队头的连接//最大超时时间:_maxIdleTime的单位是s 而getAliveTime函数返回的单位是msif (front->getAliveTime() >= (_maxIdleTime * 1000)) //当前队头的连接的空闲时间大于最大空闲时间,需要被清理{_connectionQueue.pop();delete front; //会调用Connection对象的析构函数 => 释放该连接front = nullptr;_connectionCount--; //当前连接数--}else //因为队列是先进先出, 队头的连接的空闲时间是最大的,如果队头的连接都不需要被清理,后续的连接也不需要被清理{break;}}}
}shared_ptr<Connection> ConnectionPool::GetConnection()
{unique_lock<mutex> ul(_mtx);while (_connectionQueue.empty()) //如果连接队列当中没有连接,那么需要进行等待{ //在条件变量下等待最大超时时间:case1:超时醒来 case2:有连接了被唤醒if (cv_status::timeout == _cv.wait_for(ul, chrono::milliseconds(_connectionTimeout))){if (_connectionQueue.empty())//此时是超时醒来,如果连接队列仍未空,那么获取连接失败{LOG(ERROR, "等待最大超时时间(s):" + to_string(_connectionTimeout) + "秒后,连接队列仍为空");return nullptr;}}//此时被唤醒,但是连接队列仍为空,此时又会重新等待}//拿出队头的连接,定制该连接的管理方式,当用户使用完成之后,将该连接回收到连接队列当中Connection* front = _connectionQueue.front();_connectionQueue.pop();//注意:不需要对当前连接数进行++/-- 只有当新建连接和销毁连接才需要对该变量进行操作auto lambda = [&](Connection* p) {unique_lock<mutex> ul(_mtx);//由于要操作队列,所以要加锁!!_connectionQueue.push(p);p->refreshAliveTime();//刷新当前连接的空闲时间};shared_ptr<Connection> sp(front, lambda);_cv.notify_one();//唤醒生产者检查连接队列,是否需要生产连接return sp;
}bool ConnectionPool::LoadConfigFile() //加载配置文件,读取配置项,初始化成员变量
{FILE* pf = fopen("./mysql.conf", "r");if (pf == nullptr){LOG(ERROR, "MySQL配置文件打开失败");return false;}LOG(ERROR, "MySQL配置文件打开成功");while (!feof(pf)) //没有读取到文件结尾就继续读{char buf[1024] = { 0 };fgets(buf, sizeof(buf) - 1, pf); //一次读取一行配置信息 => port=3306\nbuf[strlen(buf) - 1] = '\0';//将最后的换行符替换成\0string ret = buf;//以等号(=)区分每一个配置项的key和valuesize_t index = ret.find('=');if (index == std::string::npos) //说明当前配置项无效=>可能是注释{continue;}//index位置为空格 [0,index) => key [index+1,...] =>valuestring key = ret.substr(0, index);string value = ret.substr(index + 1);if (key == "ip"){_ip = value;}else if (key == "port"){_port = atoi(value.c_str());}else if (key == "username"){_username = value;}else if (key == "password"){_password = value;}else if (key == "dbname"){_dbname = value;}else if (key == "initSize"){_initSize = atoi(value.c_str());}else if (key == "maxSize"){_maxSize = atoi(value.c_str());}else if (key == "maxIdleTime"){_maxIdleTime = atoi(value.c_str());}else if (key == "connectionTimeout"){_connectionTimeout = atoi(value.c_str());}}LOG(NORMAL, "MySQL配置项读取完毕");return true;
}
压力测试
测试的条件是本地的虚拟机,mysql也是在本地的。可以看到性能基本上是有一倍的提升的。
#include"Connection.h"
using namespace std;
#include"ConnectionPool.h"
void SigleWithConnection()
{time_t begin = clock();for (int i = 0; i < 10000; ++i){ConnectionPool* cp = ConnectionPool::GetInstance();shared_ptr<Connection> sp = cp->GetConnection();char sql[1024] = { 0 };sprintf(sql, "insert into user(name,age,sex) values('%s','%d','%s')","zhangsan", 20, "male");sp->update(sql);}time_t end = clock();cout << end - begin << endl;
}
void SigleWithConnection()
{time_t begin = clock();for (int i = 0; i < 10000; ++i){ConnectionPool* cp = ConnectionPool::GetInstance();shared_ptr<Connection> sp = cp->GetConnection();char sql[1024] = { 0 };sprintf(sql, "insert into user(name,age,sex) values('%s','%d','%s')","zhangsan", 20, "male");sp->update(sql);/*string sql = "select * from user";sp->query(sql);*/}time_t end = clock();cout << end - begin << endl;
}void MutiWithConnection()
{time_t begin = clock();thread t1([]() {for (int i = 0; i < 2500; ++i){ConnectionPool* cp = ConnectionPool::GetInstance();shared_ptr<Connection> sp = cp->GetConnection();char sql[1024] = { 0 };sprintf(sql, "insert into user(name,age,sex) values('%s','%d','%s')","zhangsan", 20, "male");if (sp == nullptr){cout << "sp is empty" << endl;continue;}sp->update(sql);}});thread t2([]() {for (int i = 0; i < 2500; ++i){ConnectionPool* cp = ConnectionPool::GetInstance();shared_ptr<Connection> sp = cp->GetConnection();char sql[1024] = { 0 };sprintf(sql, "insert into user(name,age,sex) values('%s','%d','%s')","zhangsan", 20, "male");if (sp == nullptr){cout << "sp is empty" << endl;continue;}sp->update(sql);}});thread t3([]() {for (int i = 0; i < 2500; ++i){ConnectionPool* cp = ConnectionPool::GetInstance();shared_ptr<Connection> sp = cp->GetConnection();char sql[1024] = { 0 };sprintf(sql, "insert into user(name,age,sex) values('%s','%d','%s')","zhangsan", 20, "male");if (sp == nullptr){cout << "sp is empty" << endl;continue;}sp->update(sql);}});thread t4([]() {for (int i = 0; i < 2500; ++i){ConnectionPool* cp = ConnectionPool::GetInstance();shared_ptr<Connection> sp = cp->GetConnection();char sql[1024] = { 0 };sprintf(sql, "insert into user(name,age,sex) values('%s','%d','%s')","zhangsan", 20, "male");if (sp == nullptr){cout << "sp is empty" << endl;continue;}sp->update(sql);}});t1.join();t2.join();t3.join();t4.join();time_t end = clock();cout << end - begin << endl;
}
int main()
{//MutiWithConnection();SigleWithConnection();return 0;
}
测试的条件是本地的虚拟机,mysql也是在本地的。可以看到性能基本上是有一倍的提升的
源码链接:
https://gitee.com/maple-that-never-stays-up-late/database-connection-pool