Linux高并发服务器开发(十)反应堆模型和线程池模型

文章目录

  • 1 epoll反应堆
  • 2 线程池
    • 流程
    • 代码
  • 3 复杂版本线程池
    • 代码


1 epoll反应堆

文件描述符 监听事件 回调函数 进行封装

  1. 创建socket
  2. 设置端口复用
  3. 绑定
  4. 监听
  5. 创建epoll树
  6. 将监听文件描述符lfd上epoll树,对应的事件节点包括:文件描述符,事件epollin,回调函数initAccept

initAccept函数
调用accpet函数接受新的连接,同时将cfd对应的事件节点上树,包括cfd,epollin和回调函数readData

readData函数
读数据:
若读数据异常或者客户端关闭了连接,则下树
若读完数据,则将时间节点中的对EPOLLIN修改为写 EPOLLOUT,同时设置写回调函数为sendData

sendData函数
发送数据给客户端,发送数据完毕后,将对应的数据节点由EPOLLOUT修改为EPOLLIN,回调函数设置为readData

在这里插入图片描述

在这里插入图片描述
创建事件驱动结构体,为事件总数 +1

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
代码

//��Ӧ�Ѽ򵥰�
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <fcntl.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include "wrap.h"#define _BUF_LEN_  1024
#define _EVENT_SIZE_ 1024//ȫ��epoll���ĸ�
int gepfd = 0;//�¼������ṹ��
typedef struct xx_event{int fd;int events;void (*call_back)(int fd,int events,void *arg);void *arg;char buf[1024];int buflen;int epfd;
}xevent;xevent myevents[_EVENT_SIZE_+1];void readData(int fd,int events,void *arg);//�����¼�
//eventadd(lfd,EPOLLIN,initAccept,&myevents[_EVENT_SIZE_-1],&myevents[_EVENT_SIZE_-1]);
void eventadd(int fd,int events,void (*call_back)(int ,int ,void *),void *arg,xevent *ev)
{ev->fd = fd;ev->events = events;//ev->arg = arg;//�����ṹ���Լ�,����ͨ��arg�õ��ṹ���������Ϣev->call_back = call_back;struct epoll_event epv;epv.events = events;epv.data.ptr = ev;//����˼��epoll_ctl(gepfd,EPOLL_CTL_ADD,fd,&epv);//����
}//�޸��¼�
//eventset(fd,EPOLLOUT,senddata,arg,ev);
void eventset(int fd,int events,void (*call_back)(int ,int ,void *),void *arg,xevent *ev)
{ev->fd = fd;ev->events = events;//ev->arg = arg;ev->call_back = call_back;struct epoll_event epv;epv.events = events;epv.data.ptr = ev;epoll_ctl(gepfd,EPOLL_CTL_MOD,fd,&epv);//�޸�
}//ɾ���¼�
void eventdel(xevent *ev,int fd,int events)
{printf("begin call %s\n",__FUNCTION__);ev->fd = 0;ev->events = 0;ev->call_back = NULL;memset(ev->buf,0x00,sizeof(ev->buf));ev->buflen = 0;struct epoll_event epv;epv.data.ptr = NULL;epv.events = events;epoll_ctl(gepfd,EPOLL_CTL_DEL,fd,&epv);//����
}//��������
void senddata(int fd,int events,void *arg)
{printf("begin call %s\n",__FUNCTION__);xevent *ev = arg;Write(fd,ev->buf,ev->buflen);eventset(fd,EPOLLIN,readData,arg,ev);
}//������
void readData(int fd,int events,void *arg)
{printf("begin call %s\n",__FUNCTION__);xevent *ev = arg;ev->buflen = Read(fd,ev->buf,sizeof(ev->buf));if(ev->buflen>0) //��������{	//void eventset(int fd,int events,void (*call_back)(int ,int ,void *),void *arg,xevent *ev)eventset(fd,EPOLLOUT,senddata,arg,ev);}else if(ev->buflen==0) //�Է��ر�����{Close(fd);eventdel(ev,fd,EPOLLIN);}}
//�����Ӵ���
void initAccept(int fd,int events,void *arg)
{printf("begin call %s,gepfd =%d\n",__FUNCTION__,gepfd);//__FUNCTION__ ������int i;struct sockaddr_in addr;socklen_t len = sizeof(addr);int cfd = Accept(fd,(struct sockaddr*)&addr,&len);//�Ƿ��������//����myevents�����п��õ�λ��for(i = 0 ; i < _EVENT_SIZE_; i ++){if(myevents[i].fd==0){break;}}//���ö��¼�eventadd(cfd,EPOLLIN,readData,&myevents[i],&myevents[i]);
}int main(int argc,char *argv[])
{//����socketint lfd = Socket(AF_INET,SOCK_STREAM,0);//�˿ڸ���int opt = 1;setsockopt(lfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));//��struct sockaddr_in servaddr;servaddr.sin_family = AF_INET;servaddr.sin_port = htons(8888);servaddr.sin_addr.s_addr = htonl(INADDR_ANY);Bind(lfd,(struct sockaddr*)&servaddr,sizeof(servaddr));//����Listen(lfd,128);//����epoll�����ڵ�gepfd = epoll_create(1024);printf("gepfd === %d\n",gepfd);struct epoll_event events[1024];//�������ʼ�¼���������������������eventadd(lfd,EPOLLIN,initAccept,&myevents[_EVENT_SIZE_],&myevents[_EVENT_SIZE_]);//void eventadd(int fd,int events,void (*call_back)(int ,int ,void *),void *arg,xevent *ev)while(1){int nready = epoll_wait(gepfd,events,1024,-1);if(nready<0) //����epoll_waitʧ��{perr_exit("epoll_wait error");}else if(nready>0) //����epoll_wait�ɹ�,�������¼��������ļ��������ĸ���{int i = 0;for(i=0;i<nready; i++){xevent *xe = events[i].data.ptr;//ȡptrָ��ṹ���ַprintf("fd=%d\n",xe->fd);if(xe->events & events[i].events){xe->call_back(xe->fd,xe->events,xe);//�����¼���Ӧ�Ļص�}}}}//�رռ����ļ�������Close(lfd);return 0;
}

2 线程池

相关线程函数
pthread_create

pthread_detach 分离
pthread_attr_t attr;
pthread_attr_init
pthread_attr_setdetachstate

pthread_exit

涉及到共享资源(主线程和各个子线程共享任务池)
互斥锁
pthread_mutex_t
pthread_mutex_init
pthread_mutex_lock/unlock
pthread_mutex_destory
能够线程引起阻塞的函数
弱任务池已满,主线程应该阻塞等待子线程处理人物,此时主线程需要阻塞等待
弱任务池空了,子线程应该阻塞等待,等待主线程池往任务池中添加任务
pthread_cond_wait
pthread_cond_signal

在这里插入图片描述
子线程从任务池中获取任务,
任务池中有一个回调函数,子线程通过回调函数执行不同操作。

任务池中的任务结构体
在这里插入图片描述
线程池结构体
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

添加任务给子线程
在这里插入图片描述
任务完成后,通知生产者继续生产任务
在这里插入图片描述
退出

流程

1.初始化操作:线程数量、任务总数、malloc内存、互斥锁和条件变量初始化,创建指定数量的子线程

2.主线程:往线程池添加任务
先加锁 然后判断任务池中任务是否已满 若已满,则调用pthread_cond_wait阻塞等待,若未满,则往任务池中添加任务,添加完任务之后调用Pthread_cond_signal同志子线程去取任务
最后解锁

3.子线程:负责从任务池中获取任务并处理任务
先加锁,然后判断任务池中是否有任务,若任务池中没有任务,则调用pthread_cond_wait函数等待主线程添加任务。若任务池中有任务,则取任务并处理任务。
处理完任务后,通知主线程继续添加任务
如果shutdown为1,先解锁再自动退出:pthread_exit;
最后释放锁

在这里插入图片描述

代码

ThreadPool.h

#ifndef _THREADPOOL_H
#define _THREADPOOL_H#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>typedef struct _PoolTask
{int tasknum;//模拟任务编号void *arg;//回调函数参数void (*task_func)(void *arg);//任务的回调函数
}PoolTask ;typedef struct _ThreadPool
{int max_job_num;//最大任务个数int job_num;//实际任务个数PoolTask *tasks;//任务队列数组int job_push;//入队位置int job_pop;// 出队位置int thr_num;//线程池内线程个数pthread_t *threads;//线程池内线程数组int shutdown;//是否关闭线程池pthread_mutex_t pool_lock;//线程池的锁pthread_cond_t empty_task;//任务队列为空的条件pthread_cond_t not_empty_task;//任务队列不为空的条件}ThreadPool;void create_threadpool(int thrnum,int maxtasknum);//创建线程池--thrnum  代表线程个数,maxtasknum 最大任务个数
void destroy_threadpool(ThreadPool *pool);//摧毁线程池
void addtask(ThreadPool *pool);//添加任务到线程池
void taskRun(void *arg);//任务回调函数#endif

ThreadPool.c

//简易版线程池
#include "threadpoolsimple.h"int beginnum = 1000;void *thrRun(void *arg)
{//printf("begin call %s-----\n",__FUNCTION__);ThreadPool *pool = (ThreadPool*)arg;int taskpos = 0;//任务位置PoolTask *task = (PoolTask *)malloc(sizeof(PoolTask));while(1){//获取任务,先要尝试加锁pthread_mutex_lock(&thrPool->pool_lock);//无任务并且线程池不是要摧毁while(thrPool->job_num <= 0 && !thrPool->shutdown ){//如果没有任务,线程会阻塞pthread_cond_wait(&thrPool->not_empty_task,&thrPool->pool_lock);}if(thrPool->job_num){//有任务需要处理taskpos = (thrPool->job_pop++)%thrPool->max_job_num;//printf("task out %d...tasknum===%d tid=%lu\n",taskpos,thrPool->tasks[taskpos].tasknum,pthread_self());//为什么要拷贝?避免任务被修改,生产者会添加任务memcpy(task,&thrPool->tasks[taskpos],sizeof(PoolTask));task->arg = task;thrPool->job_num--;//task = &thrPool->tasks[taskpos];pthread_cond_signal(&thrPool->empty_task);//通知生产者}if(thrPool->shutdown){//代表要摧毁线程池,此时线程退出即可//pthread_detach(pthread_self());//临死前分家pthread_mutex_unlock(&thrPool->pool_lock);free(task);pthread_exit(NULL);}//释放锁pthread_mutex_unlock(&thrPool->pool_lock);task->task_func(task->arg);//执行回调函数}//printf("end call %s-----\n",__FUNCTION__);
}//创建线程池
void create_threadpool(int thrnum,int maxtasknum)
{printf("begin call %s-----\n",__FUNCTION__);thrPool = (ThreadPool*)malloc(sizeof(ThreadPool));thrPool->thr_num = thrnum;thrPool->max_job_num = maxtasknum;thrPool->shutdown = 0;//是否摧毁线程池,1代表摧毁thrPool->job_push = 0;//任务队列添加的位置thrPool->job_pop = 0;//任务队列出队的位置thrPool->job_num = 0;//初始化的任务个数为0thrPool->tasks = (PoolTask*)malloc((sizeof(PoolTask)*maxtasknum));//申请最大的任务队列//初始化锁和条件变量pthread_mutex_init(&thrPool->pool_lock,NULL);pthread_cond_init(&thrPool->empty_task,NULL);pthread_cond_init(&thrPool->not_empty_task,NULL);int i = 0;thrPool->threads = (pthread_t *)malloc(sizeof(pthread_t)*thrnum);//申请n个线程id的空间pthread_attr_t attr;pthread_attr_init(&attr);pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);for(i = 0;i < thrnum;i++){pthread_create(&thrPool->threads[i],&attr,thrRun,(void*)thrPool);//创建多个线程}//printf("end call %s-----\n",__FUNCTION__);
}
//摧毁线程池
void destroy_threadpool(ThreadPool *pool)
{pool->shutdown = 1;//开始自爆pthread_cond_broadcast(&pool->not_empty_task);//诱杀 int i = 0;for(i = 0; i < pool->thr_num ; i++){pthread_join(pool->threads[i],NULL);}pthread_cond_destroy(&pool->not_empty_task);pthread_cond_destroy(&pool->empty_task);pthread_mutex_destroy(&pool->pool_lock);free(pool->tasks);free(pool->threads);free(pool);
}//添加任务到线程池
void addtask(ThreadPool *pool)
{//printf("begin call %s-----\n",__FUNCTION__);pthread_mutex_lock(&pool->pool_lock);//实际任务总数大于最大任务个数则阻塞等待(等待任务被处理)while(pool->max_job_num <= pool->job_num){pthread_cond_wait(&pool->empty_task,&pool->pool_lock);}int taskpos = (pool->job_push++)%pool->max_job_num;//printf("add task %d  tasknum===%d\n",taskpos,beginnum);pool->tasks[taskpos].tasknum = beginnum++;pool->tasks[taskpos].arg = (void*)&pool->tasks[taskpos];pool->tasks[taskpos].task_func = taskRun;pool->job_num++;pthread_mutex_unlock(&pool->pool_lock);pthread_cond_signal(&pool->not_empty_task);//通知包身工//printf("end call %s-----\n",__FUNCTION__);
}//任务回调函数
void taskRun(void *arg)
{PoolTask *task = (PoolTask*)arg;int num = task->tasknum;printf("task %d is runing %lu\n",num,pthread_self());sleep(1);printf("task %d is done %lu\n",num,pthread_self());
}int main()
{create_threadpool(3,20);int i = 0;for(i = 0;i < 50 ; i++){addtask(thrPool);//模拟添加任务}sleep(20);destroy_threadpool(thrPool);return 0;
}

3 复杂版本线程池

线程池相关信息
在这里插入图片描述
在这里插入图片描述

代码

threadpool.h

#ifndef __THREADPOOL_H_
#define __THREADPOOL_H_typedef struct threadpool_t threadpool_t;/*** @function threadpool_create* @descCreates a threadpool_t object.* @param thr_num  thread num* @param max_thr_num  max thread size* @param queue_max_size   size of the queue.* @return a newly created thread pool or NULL*/
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);/*** @function threadpool_add* @desc add a new task in the queue of a thread pool* @param pool     Thread pool to which add the task.* @param function Pointer to the function that will perform the task.* @param argument Argument to be passed to the function.* @return 0 if all goes well,else -1*/
int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg);/*** @function threadpool_destroy* @desc Stops and destroys a thread pool.* @param pool  Thread pool to destroy.* @return 0 if destory success else -1*/
int threadpool_destroy(threadpool_t *pool);/*** @desc get the thread num* @pool pool threadpool* @return # of the thread*/
int threadpool_all_threadnum(threadpool_t *pool);/*** desc get the busy thread num* @param pool threadpool* return # of the busy thread*/
int threadpool_busy_threadnum(threadpool_t *pool);#endif

threadpool.c

#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <signal.h>
#include <errno.h>
#include "threadpool.h"#define DEFAULT_TIME 10                 /*10s检测一次*/
#define MIN_WAIT_TASK_NUM 10            /*如果queue_size > MIN_WAIT_TASK_NUM 添加新的线程到线程池*/ 
#define DEFAULT_THREAD_VARY 10          /*每次创建和销毁线程的个数*/
#define true 1
#define false 0typedef struct 
{void *(*function)(void *);          /* 函数指针,回调函数 */void *arg;                          /* 上面函数的参数 */
} threadpool_task_t;                    /* 各子线程任务结构体 *//* 描述线程池相关信息 */
struct threadpool_t 
{pthread_mutex_t lock;               /* 用于锁住本结构体 */    pthread_mutex_t thread_counter;     /* 记录忙状态线程个数de琐 -- busy_thr_num */pthread_cond_t queue_not_full;      /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */pthread_cond_t queue_not_empty;     /* 任务队列里不为空时,通知等待任务的线程 */pthread_t *threads;                 /* 存放线程池中每个线程的tid。数组 */pthread_t adjust_tid;               /* 存管理线程tid */threadpool_task_t *task_queue;      /* 任务队列(数组首地址) */int min_thr_num;                    /* 线程池最小线程数 */int max_thr_num;                    /* 线程池最大线程数 */int live_thr_num;                   /* 当前存活线程个数 */int busy_thr_num;                   /* 忙状态线程个数 */int wait_exit_thr_num;              /* 要销毁的线程个数 */int queue_front;                    /* task_queue队头下标 */int queue_rear;                     /* task_queue队尾下标 */int queue_size;                     /* task_queue队中实际任务数 */int queue_max_size;                 /* task_queue队列可容纳任务数上限 */int shutdown;                       /* 标志位,线程池使用状态,true或false */
};void *threadpool_thread(void *threadpool);void *adjust_thread(void *threadpool);int is_thread_alive(pthread_t tid);
int threadpool_free(threadpool_t *pool);//threadpool_create(3,100,100);  
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
{int i;threadpool_t *pool = NULL;do {if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {  printf("malloc threadpool fail");break;                                      /*跳出do while*/}pool->min_thr_num = min_thr_num;pool->max_thr_num = max_thr_num;pool->busy_thr_num = 0;pool->live_thr_num = min_thr_num;               /* 活着的线程数 初值=最小线程数 */pool->wait_exit_thr_num = 0;pool->queue_size = 0;                           /* 有0个产品 */pool->queue_max_size = queue_max_size;pool->queue_front = 0;pool->queue_rear = 0;pool->shutdown = false;                         /* 不关闭线程池 *//* 根据最大线程上限数, 给工作线程数组开辟空间, 并清零 */pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num); if (pool->threads == NULL) {printf("malloc threads fail");break;}memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);/* 队列开辟空间 */pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);if (pool->task_queue == NULL) {printf("malloc task_queue fail\n");break;}/* 初始化互斥琐、条件变量 */if (pthread_mutex_init(&(pool->lock), NULL) != 0|| pthread_mutex_init(&(pool->thread_counter), NULL) != 0|| pthread_cond_init(&(pool->queue_not_empty), NULL) != 0|| pthread_cond_init(&(pool->queue_not_full), NULL) != 0){printf("init the lock or cond fail\n");break;}//启动工作线程pthread_attr_t attr;pthread_attr_init(&attr);pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);for (i = 0; i < min_thr_num; i++) {pthread_create(&(pool->threads[i]), &attr, threadpool_thread, (void *)pool);/*pool指向当前线程池*/printf("start thread 0x%x...\n", (unsigned int)pool->threads[i]);}//创建管理者线程pthread_create(&(pool->adjust_tid), &attr, adjust_thread, (void *)pool);return pool;} while (0);/* 前面代码调用失败时,释放poll存储空间 */threadpool_free(pool);return NULL;
}/* 向线程池中 添加一个任务 */
//threadpool_add(thp, process, (void*)&num[i]);   /* 向线程池中添加任务 process: 小写---->大写*/int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)
{pthread_mutex_lock(&(pool->lock));/* ==为真,队列已经满, 调wait阻塞 */while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) {pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));}if (pool->shutdown) {pthread_cond_broadcast(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0;}/* 清空 工作线程 调用的回调函数 的参数arg */if (pool->task_queue[pool->queue_rear].arg != NULL) {pool->task_queue[pool->queue_rear].arg = NULL;}/*添加任务到任务队列里*/pool->task_queue[pool->queue_rear].function = function;pool->task_queue[pool->queue_rear].arg = arg;pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size;       /* 队尾指针移动, 模拟环形 */pool->queue_size++;/*添加完任务后,队列不为空,唤醒线程池中 等待处理任务的线程*/pthread_cond_signal(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0;
}/* 线程池中各个工作线程 */
void *threadpool_thread(void *threadpool)
{threadpool_t *pool = (threadpool_t *)threadpool;threadpool_task_t task;while (true) {/* Lock must be taken to wait on conditional variable *//*刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务*/pthread_mutex_lock(&(pool->lock));/*queue_size == 0 说明没有任务,调 wait 阻塞在条件变量上, 若有任务,跳过该while*/while ((pool->queue_size == 0) && (!pool->shutdown)) {  printf("thread 0x%x is waiting\n", (unsigned int)pthread_self());pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));//暂停到这/*清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程*/if (pool->wait_exit_thr_num > 0) {pool->wait_exit_thr_num--;/*如果线程池里线程个数大于最小值时可以结束当前线程*/if (pool->live_thr_num > pool->min_thr_num) {printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());pool->live_thr_num--;pthread_mutex_unlock(&(pool->lock));//pthread_detach(pthread_self());pthread_exit(NULL);}}}/*如果指定了true,要关闭线程池里的每个线程,自行退出处理---销毁线程池*/if (pool->shutdown) {pthread_mutex_unlock(&(pool->lock));printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());//pthread_detach(pthread_self());pthread_exit(NULL);     /* 线程自行结束 */}/*从任务队列里获取任务, 是一个出队操作*/task.function = pool->task_queue[pool->queue_front].function;task.arg = pool->task_queue[pool->queue_front].arg;pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size;       /* 出队,模拟环形队列 */pool->queue_size--;/*通知可以有新的任务添加进来*/pthread_cond_broadcast(&(pool->queue_not_full));/*任务取出后,立即将 线程池琐 释放*/pthread_mutex_unlock(&(pool->lock));/*执行任务*/ printf("thread 0x%x start working\n", (unsigned int)pthread_self());pthread_mutex_lock(&(pool->thread_counter));                            /*忙状态线程数变量琐*/pool->busy_thr_num++;                                                   /*忙状态线程数+1*/pthread_mutex_unlock(&(pool->thread_counter));(*(task.function))(task.arg);                                           /*执行回调函数任务*///task.function(task.arg);                                              /*执行回调函数任务*//*任务结束处理*/ printf("thread 0x%x end working\n", (unsigned int)pthread_self());pthread_mutex_lock(&(pool->thread_counter));pool->busy_thr_num--;                                       /*处理掉一个任务,忙状态数线程数-1*/pthread_mutex_unlock(&(pool->thread_counter));}pthread_exit(NULL);
}/* 管理线程 */
void *adjust_thread(void *threadpool)
{int i;threadpool_t *pool = (threadpool_t *)threadpool;while (!pool->shutdown) {sleep(DEFAULT_TIME);                                    /*定时 对线程池管理*/pthread_mutex_lock(&(pool->lock));int queue_size = pool->queue_size;                      /* 关注 任务数 */int live_thr_num = pool->live_thr_num;                  /* 存活 线程数 */pthread_mutex_unlock(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));int busy_thr_num = pool->busy_thr_num;                  /* 忙着的线程数 */pthread_mutex_unlock(&(pool->thread_counter));/* 创建新线程 算法: 任务数大于最小线程池个数, 且存活的线程数少于最大线程个数时 如:30>=10 && 40<100*/if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) {pthread_mutex_lock(&(pool->lock));  int add = 0;/*一次增加 DEFAULT_THREAD 个线程*/for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY&& pool->live_thr_num < pool->max_thr_num; i++) {if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) {pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);add++;pool->live_thr_num++;}}pthread_mutex_unlock(&(pool->lock));}/* 销毁多余的空闲线程 算法:忙线程X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时*/if ((busy_thr_num * 2) < live_thr_num  &&  live_thr_num > pool->min_thr_num) {/* 一次销毁DEFAULT_THREAD个线程, 隨機10個即可 */pthread_mutex_lock(&(pool->lock));pool->wait_exit_thr_num = DEFAULT_THREAD_VARY;      /* 要销毁的线程数 设置为10 */pthread_mutex_unlock(&(pool->lock));for (i = 0; i < DEFAULT_THREAD_VARY; i++) {/* 通知处在空闲状态的线程, 他们会自行终止*/pthread_cond_signal(&(pool->queue_not_empty));}}}return NULL;
}int threadpool_destroy(threadpool_t *pool)
{int i;if (pool == NULL) {return -1;}pool->shutdown = true;/*先销毁管理线程*///pthread_join(pool->adjust_tid, NULL);for (i = 0; i < pool->live_thr_num; i++) {/*通知所有的空闲线程*/pthread_cond_broadcast(&(pool->queue_not_empty));}/*for (i = 0; i < pool->live_thr_num; i++) {pthread_join(pool->threads[i], NULL);}*/threadpool_free(pool);return 0;
}int threadpool_free(threadpool_t *pool)
{if (pool == NULL) {return -1;}if (pool->task_queue) {free(pool->task_queue);}if (pool->threads) {free(pool->threads);pthread_mutex_lock(&(pool->lock));pthread_mutex_destroy(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));pthread_mutex_destroy(&(pool->thread_counter));pthread_cond_destroy(&(pool->queue_not_empty));pthread_cond_destroy(&(pool->queue_not_full));}free(pool);pool = NULL;return 0;
}int threadpool_all_threadnum(threadpool_t *pool)
{int all_threadnum = -1;pthread_mutex_lock(&(pool->lock));all_threadnum = pool->live_thr_num;pthread_mutex_unlock(&(pool->lock));return all_threadnum;
}int threadpool_busy_threadnum(threadpool_t *pool)
{int busy_threadnum = -1;pthread_mutex_lock(&(pool->thread_counter));busy_threadnum = pool->busy_thr_num;pthread_mutex_unlock(&(pool->thread_counter));return busy_threadnum;
}int is_thread_alive(pthread_t tid)
{int kill_rc = pthread_kill(tid, 0);     //发0号信号,测试线程是否存活if (kill_rc == ESRCH) {return false;}return true;
}/*测试*/ #if 1
/* 线程池中的线程,模拟处理业务 */
void *process(void *arg)
{printf("thread 0x%x working on task %d\n ",(unsigned int)pthread_self(),*(int *)arg);sleep(1);printf("task %d is end\n", *(int *)arg);return NULL;
}int main(void)
{/*threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);*/threadpool_t *thp = threadpool_create(3,100,100);   /*创建线程池,池里最小3个线程,最大100,队列最大100*/printf("pool inited");//int *num = (int *)malloc(sizeof(int)*20);int num[20], i;for (i = 0; i < 20; i++) {num[i]=i;printf("add task %d\n",i);threadpool_add(thp, process, (void*)&num[i]);   /* 向线程池中添加任务 */}sleep(10);                                          /* 等子线程完成任务 */threadpool_destroy(thp);return 0;
}#endif

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/39996.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Taogogo Taocms v3.0.2 远程代码执行漏洞(CVE-2022-25578)

前言 CVE-2022-25578 是一个存在于 Taogogo Taocms v3.0.2 中的代码注入漏洞。此漏洞允许攻击者通过任意编辑 .htaccess 文件来执行代码注入。 漏洞详情 漏洞描述&#xff1a;攻击者可以利用此漏洞上传一个 .htaccess 文件到网站&#xff0c;并在文件中注入恶意代码&#xf…

苹果手机怎么刷机?适合小白的刷机办法!

自己的苹果手机用时间长了&#xff0c;有些人想要为自己的手机重新刷新一下&#xff0c;但又不知道怎么刷机。不要慌现在就来给大家详细介绍一下苹果手机怎么刷机&#xff0c;希望可以帮助到大家。 iPhone常见的刷机方式&#xff0c;分为iTunes官方和第三方软件两种刷机方式。 …

【elementui】记录解决el-tree开启show-checkbox后,勾选一个叶结点后会自动折叠的现象

第一种解决方案&#xff1a;设置default-expand-keys的值为当前选中的key值即可 <el-treeref"tree"class"checkboxSelect-wrap":data"treeData"show-checkboxnode-key"id":expand-on-click-node"true":props"defau…

第T3周:天气识别

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 一、前期工作 本文将采用CNN实现多云、下雨、晴、日出四种天气状态的识别。较上篇文章&#xff0c;本文为了增加模型的泛化能力&#xff0c;新增了Dropout层并…

拉曼光谱入门:1.光谱的分类与散射光谱发展史

一、光谱是什么&#xff1f; 在一个宁静的午后&#xff0c;年轻的艾萨克牛顿坐在他母亲花园里的一棵苹果树下&#xff0c;手握一块精致的三棱镜。他沉思着光的奥秘&#xff0c;意识到光并非单一的白色&#xff0c;而是一种由多彩色组成的复杂结构。 他决心进行一次实验&#xf…

浅析C++函数重载

浅析C函数重载 C语言和C函数调用的不同 C语言会进行报错 C能成功运行并且自动识别类型 由此可以看出&#xff0c;C在函数调用时进行了调整&#xff0c;使其支持函数重载&#xff0c;那么我们就来看看进行了哪些调整吧&#x1f60e; 分析函数调用 首先我们要知道&#xff0c…

2024企业数据资产化及数据资产入表方案梳理

01 数据资产入表&#xff1a;是一个将组织的各类数据资产进行登记、分类、评估和管理的流程。 数据资产包括&#xff1a;客户信息、交易记录、产品数据、财务数据等。 做个比喻吧&#xff1a;数据资产入表就像是给公司的数据资产做“人口普查”—— ①找出公司有哪些数据找…

macos m2 百度paddleocr文字识别 python

创建了一个虚拟环境&#xff1a;conda create -n orc python3.11.7 进入虚拟环境后执行2条命令 pip install paddleocr -i https://pypi.tuna.tsinghua.edu.cn/simple pip install paddlepaddle -i https://pypi.tuna.tsinghua.edu.cn/simple​ ​ 安装好后&#xff0c;在网…

204.贪心算法:分发饼干(力扣)

以下来源于代码随想录 class Solution { public:int findContentChildren(vector<int>& g, vector<int>& s) {// 对孩子的胃口进行排序sort(g.begin(), g.end());// 对饼干的尺寸进行排序sort(s.begin(), s.end());int index s.size() - 1; // 从最大的饼…

严蔚敏数据结构(C语言版)吴伟民宁编著清华大学计算机系列教材+题集+配套题库+笔记+习题PDF电子版

今天分享的是 严蔚敏数据结构&#xff08;C语言版&#xff09;题集配套题库笔记习题PDF电子版 注&#xff1a;本资源搜集于网络&#xff0c;仅供学习交流&#xff0c;严禁用于商业用途 内容简介&#xff1a; “数据结构”是计算机程序设计的重要理论技术基础&#xff0c;它…

ArcTs布局入门04——相对布局 媒体查询

如果你也对鸿蒙开发感兴趣&#xff0c;加入“Harmony自习室”吧 扫描下面的二维码关注公众号。 本文将探讨相对布局与媒体查询&#xff0c;为啥把他们放到一起呢&#xff1f;主要是因为相对布局在响应式的场景下做得不太好&#xff0c;一般情况下和媒体查询&#xff08;不同尺…

MySQL之备份与恢复(二)

备份与恢复 定义恢复需求 如果一切正常&#xff0c;那么永远也不需要考虑恢复。但是&#xff0c;一旦需要恢复&#xff0c;只有世界上最好的备份系统是没用的&#xff0c;还需要一个强大的恢复系统。 不幸的是&#xff0c;让备份系统平滑工作比构造良好的恢复过程和工具更容易…

WebRtc实现1V1音视频通话

WebRtc实现1V1音视频通话 简介应用场景共享桌面的基本原理传统共享桌面WebRTC 共享桌面 相关API基本使用调用本地摄像头播放约束设置 媒体协商过程协议协议的交换与传输 WebRTC 通信过程ICE Candidate&#xff08;ICE 候选者&#xff09; 1V1视频通话 简介 WebRTC&#xff0c;名…

新版本发布丨昂辉科技EasySAR-Configurator V1.2.0再启航

昂辉科技新一代跨平台高性能AUTOSAR配置工具EasySAR-Configurator V1.2.0全新版本重磅发布&#xff01;产品基于Web架构前后端分离的方式开发&#xff0c;可提供SaaS部署&#xff0c;能够实现精准配置和最大限度的代码裁剪&#xff0c;且配备标准的约束限制、配置验证、代码生成…

jenkins 发布服务到linux服务器

1.环境准备 1.1 需要一台已经部署了jenkins的服务器&#xff0c;上面已经集成好了&#xff0c;jdk、maven、nodejs、git等基础的服务。 1.2 需要安装插件 pusblish over ssh 1.3 准备一台额外的linux服务器&#xff0c;安装好jdk 2.流程描述 2.1 配置jenkins&#xff0c;包括p…

如何将 Apifox 的自动化测试与 Jenkins 集成?

CI/CD &#xff08;持续集成/持续交付&#xff09; 在 API 测试 中的主要目的是为了自动化 API 的验证流程&#xff0c;确保 API 发布到生产环境前的可用性。通过持续集成&#xff0c;我们可以在 API 定义变更时自动执行功能测试&#xff0c;以及时发现潜在问题。 Apifox 支持…

二叉树的最近公共祖先-二叉树

236. 二叉树的最近公共祖先 - 力扣&#xff08;LeetCode&#xff09; ​ 递归 lson、rson左右子树&#xff1b; 深度优先遍历&#xff0c;遍历到p或者q就返回ture&#xff1b; class Solution { public:TreeNode* ans;bool dfs(TreeNode* root, TreeNode* p, TreeNode* q){i…

GoLand 2024 for Mac GO语言集成开发工具环境

Mac分享吧 文章目录 效果一、下载软件二、开始安装1、双击运行软件&#xff08;适合自己的M芯片版或Intel芯片版&#xff09;&#xff0c;将其从左侧拖入右侧文件夹中&#xff0c;等待安装完毕2、应用程序显示软件图标&#xff0c;表示安装成功3、打开访达&#xff0c;点击【文…

CAN学习笔记

学习链接&#xff1a;CAN学习笔记&#xff08;1&#xff09;_can sjw-CSDN博客 内容全部取自链接&#xff0c;非原创。用于自己学习和记录&#xff0c;如有错误请指正。如果侵权了&#xff0c;请联系我删掉。 CAN主要有两种物理层&#xff0c;1.闭环的ISO11898 2.开环的ISO1…

Hadoop3:NameNode和DataNode多目录配置(扩充磁盘的技术支持)

一、NameNode多目录 1、说明 NameNode多目录&#xff0c;需要在刚搭建Hadoop集群的时候&#xff0c;就配置好 因为&#xff0c;配置这个&#xff0c;需要格式化NameNode 所以&#xff0c;如果一开始没配置NameNode多目录&#xff0c;后面&#xff0c;就不要配置了。 2、配置…