概要:本文介绍mysql连接池的实现,要求读者了解线程池
一、为什么需要mysql连接池?
资源复用 :不使用连接池,每次数据库请求都新建一条连接,将耗费系 统资源。 流程如下:
- 通过三次握手建立 TCP 连接
- MySQL 认证
- SQL 执行
- 通过四次挥手断开 TCP 连接
更快的系统响应速度:
1.一次连接建立和销毁,可复用同一条连接多次执行 SQL 语句。
2.统一的连接管理,避免数据库连接泄露
二、mysql连接池运行原理
三、代码实现
1.结构体定义
typedef struct task_t {struct task_t *next; // 指向下一个任务节点int clientfd; // 客户端fdchar SQL[MAX_SQL_LENGTH]; // SQL语句缓冲区} task_t;typedef struct task_queue_t { // task队列task_t *head; // 指向队列的第一个task节点task_t *tail; // 指向队列的最后一个task节点int block; // 阻塞标志pthread_spinlock_t lock; // 自旋锁变量pthread_mutex_t mutex; // 互斥锁变量pthread_cond_t cond; // 条件变量} task_queue_t;typedef struct argc {MYSQL *mysql;task_queue_t *queue;
} argc;
2.资源创建
a.任务队列
task_queue_t *task_queue_create() { // 创建一个任务队列int ret;task_queue_t *queue = (task_queue_t *)malloc(sizeof(task_queue_t));if (queue) {ret = pthread_mutex_init(&queue->mutex, NULL);if (ret == 0) {ret = pthread_cond_init(&queue->cond, NULL);if (ret == 0) {pthread_spin_init(&queue->lock, 0);queue->head = NULL;queue->tail = NULL;queue->block = 1;return queue;}}free(queue);}return NULL;}
b.mysql连接句柄
void mysql_conn_init(MYSQL* mysql) {mysql_init(mysql); // 初始化mysql句柄// 连接到MySQL数据库mysql_real_connect(mysql, MYSQL_SERVER_IP, MYSQL_SERVER_USERNAME, MYSQL_SERVER_PASSWORD, MYSQL_SERVER_DEFAULT_DB, MYSQL_SERVER_PORT, NULL, 0);}
2.sql任务的添加、执行
a.push、pop
void add_task(task_queue_t *queue, task_t *task) { // 向任务队列中添加一个taskpthread_spin_lock(&queue->lock);if (!queue->tail) {queue->tail->next = task;queue->tail = task;}else {queue->head = task;queue->tail = task;}pthread_spin_unlock(&queue->lock);pthread_cond_signal(&queue->cond);
}void *pop_task(task_queue_t *queue) { // 从任务队列中取出一个任务pthread_spin_lock(&queue->lock);if (queue->head == NULL) {pthread_spin_unlock(&queue->lock);return NULL;}// 取出队列中第一个任务task_t *task;task = queue->head;queue->head = task->next;//判断队列是否为空if (queue->head == NULL) {queue->tail = queue->head;}pthread_spin_unlock(&queue->lock);return task;
}task_t *get_task(task_queue_t *queue) { // 原子地从队列中取出一个任务task_t *task;while ((task = pop_task(queue)) == NULL) {pthread_mutex_lock(&queue->mutex);if (queue->block == 0) {pthread_mutex_unlock(&queue->mutex);return NULL;}pthread_cond_wait(&queue->cond, &queue->mutex);pthread_mutex_unlock(&queue->mutex);}return task;
}
b.执行任务并将mysql服务器的回复信息转发给客户端
void *mysql_conn_thrd_worker(void *argc) {task_t *task;struct argc *arg = (struct argc*)argc;task_queue_t *queue = arg->queue;MYSQL *mysql = arg->mysql;while (!destroy_pool) {task = get_task(queue);if(!task) break;// 执行其中的SQL语句mysql_real_query(mysql, task->SQL, strlen(task->SQL)); // 注入sql语句MYSQL_RES *res = mysql_store_result(mysql); // 存储mysql返回信息char response[64];// 将mysql回复结果cpoy进responseif (res) {MYSQL_ROW row;row = mysql_fetch_row(res);if (row) {snprintf(response, sizeof(response), "%s", row[0]); // 假设结果为字符串类型,仅复制第一列数据} else {snprintf(response, sizeof(response), "No result found");}mysql_free_result(res); // 释放结果集} else {snprintf(response, sizeof(response), "Error retrieving result");}// 发送回复信息send(task->clientfd, response, 64, 0);// 销毁任务free(task);}
}
3.主线程接收客户端连接、sql请求
int tcp_server(task_queue_t *queue) {// 初始化服务器套接字int sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd < 0) {perror("create sockfd fail\n");return -1;}struct sockaddr_in addr;memset(&addr, 0, sizeof(struct sockaddr_in));addr.sin_family = AF_INET;addr.sin_port = htons(2024);addr.sin_addr.s_addr = htonl(INADDR_ANY);if (-1 == bind(sockfd, (struct sockaddr*)&addr, sizeof(addr))) {perror("bind fail\n");return -2;}if (-1 == listen(sockfd, 5)) {perror("listen fail\n");return -3;}//IO多路复用int epfd = epoll_create(1);struct epoll_event ev;ev.events = EPOLLIN;ev.data.fd = sockfd;epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);struct epoll_event events[1024] = {0}; while (1) {int ret = epoll_wait(epfd, events, 1024, -1);if (ret == -1) {perror("epoll_wait fail");break;}int i = 0;for (i = 0; i < ret; i++) {if (sockfd == events[i].data.fd) { struct sockaddr_in clientaddr;socklen_t len = sizeof(clientaddr);int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);fcntl(clientfd, F_SETFL, SOCK_NONBLOCK);ev.events = EPOLLIN;ev.data.fd = clientfd;epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);} else if (events[i].events & EPOLLIN){while (1) {char buffer[256] = {0};int count = recv(events[i].data.fd, buffer, 10, 0);if (count < 0) {//读取完毕或当前没有数据可读或者出错if( (errno == EAGAIN) || (errno == EWOULDBLOCK)) {//读取完毕printf("recv finished\n");break;}//recv出错close(events[i].data.fd);//关闭事件的套接字break;} else if (count == 0) {//对方发送fin断开连接epoll_ctl(epfd, EPOLL_CTL_DEL, events[i].data.fd, &ev);//移除该事件close(events[i].data.fd);//关闭事件的套接字break;}else {//接收到数据task_t *task; // 将clientfd和buffer包装进taskinit_task(task); // 初始化task的next指针strcpy(task->SQL, buffer); // 装入sql请求task->clientfd = events[i].data.fd; add_task(queue, task); // 将此task添加到任务队列} }}}}close(sockfd);return 0;
}
4.main函数
int main() {// 工作队列task_queue_t *queue = task_queue_create();if (!queue) exit(1);// 创建MySQL连接MYSQL mysqls[NUM_MYSQL_CONNECTION] = {0};int i;for (i = 0; i < NUM_MYSQL_CONNECTION; i++) {mysql_conn_init(&mysqls[i]);}// 创建工作线程pthread_t threadid[NUM_MYSQL_CONNECTION];for (i = 0; i < NUM_MYSQL_CONNECTION; i++) {struct argc *argc = (struct argc *)malloc(sizeof(struct argc));argc->mysql = &mysqls[i];argc->queue = queue;pthread_create(&threadid[i], NULL, mysql_conn_thrd_worker, argc);free(argc);}tcp_server(queue); // Tcp 服务器,接收客户端连接,包装SQL请求信息并添加到工作队列return 0;
}
推荐学习 https://xxetb.xetslk.com/s/p5Ibb