一个Linux下C线程池的实现(转)

1.线程池基本原理

  在传统服务器结构中, 常是 有一个总的 监听线程监听有没有新的用户连接服务器, 每当有一个新的 用户进入, 服务器就开启一个新的线程用户处理这 个用户的数据包。这个线程只服务于这个用户 , 当 用户与服务器端关闭连接以后, 服务器端销毁这个线程。然而频繁地开辟与销毁线程极大地占用了系统的资源。而且在大量用户的情况下, 系统为了开辟和销毁线程将浪费大量的时间和资源。线程池提供了一个解决外部大量用户与服务器有限资源的矛盾, 线程池和传统的一个用户对应一 个线程的处理方法不同, 它的基本思想就是在程序 开始时就在内存中开辟一些线程, 线程的数目是 固定的,他们独自形成一个类, 屏蔽了对外的操作, 而服务器只需要将数据包交给线程池就可以了。当有新的客户请求到达时 , 不是新创建一个线程为其服务 , 而是从“池子”中选择一个空闲的线程为新的客户请求服务 ,服务完毕后 , 线程进入空闲线程池中。如果没有线程空闲 的 话, 就 将 数 据 包 暂 时 积 累 , 等 待 线 程 池 内 有 线 程空闲以后再进行处理。通过对多个任务重用已经存在的线程对象 , 降低了对线程对象创建和销毁的开销。当客户请求 时 , 线程对象 已 经 存 在 , 可 以 提 高 请 求 的响应时间 , 从而整体地提高了系统服务的表现。

  一般来说实现一个线程池主要包括以下几个组成部分:

1)线程管理器:用于创建并管理线程池。

2)工作线程:线程池中实际执行任务的线程。在初始化线程时会预先创建好固定数目的线程在池中,这些初始化的线程一般处于空闲状态,一般不占用CPU,占用较小的内存空间。

3)任务接口:每个任务必须实现的接口,当线程池的任务队列中有可执行任务时,被空闲的工作线程调去执行(线程的闲与忙是通过互斥量实现的,跟前面文章中的设置标志位差不多),把任务抽象出来形成接口,可以做到线程池与具体的任务无关。

4)任务队列:用来存放没有处理的任务,提供一种缓冲机制,实现这种结构有好几种方法,常用的是队列,主要运用先进先出原理,另外一种是链表之类的数据结构,可以动态的为它分配内存空间,应用中比较灵活,下文中就是用到的链表。

下面的不在赘述百度《线程池技术在并发服务器中的应用》写的非常详细!

转自:http://blog.csdn.net/zouxinfox/article/details/3560891

  什么时候需要创建线程池呢?简单的说,如果一个应用需要频繁的创建和销毁线程,而任务执行的时间又非常短,这样线程创建和销毁的带来的开销就不容忽视,这时也是线程池该出场的机会了。如果线程创建和销毁时间相比任务执行时间可以忽略不计,则没有必要使用线程池了。

    下面是Linux系统下用C语言创建的一个线程池。线程池会维护一个任务链表(每个CThread_worker结构就是一个任务)。
    pool_init()函数预先创建好max_thread_num个线程,每个线程执thread_routine ()函数。该函数中

  1. while (pool->cur_queue_size == 0)
  2. {
  3.       pthread_cond_wait (&(pool->queue_ready),&(pool->queue_lock));
  4. }
表示如果任务链表中没有任务,则该线程出于阻塞等待状态。否则从队列中取出任务并执行。
    
    pool_add_worker()函数向线程池的任务链表中加入一个任务,加入后通过调用pthread_cond_signal (&(pool->queue_ready))唤醒一个出于阻塞状态的线程(如果有的话)。
    
    pool_destroy ()函数用于销毁线程池,线程池任务链表中的任务不会再被执行,但是正在运行的线程会一直把任务运行完后再退出。
    

[cpp] view plain copy
  1. #include <stdio.h>  
  2. #include <stdlib.h>  
  3. #include <unistd.h>  
  4. #include <sys/types.h>  
  5. #include <pthread.h>  
  6. #include <assert.h>  
  7.   
  8. /* 
  9. *线程池里所有运行和等待的任务都是一个CThread_worker 
  10. *由于所有任务都在链表里,所以是一个链表结构 
  11. */  
  12. typedef struct worker  
  13. {  
  14.     /*回调函数,任务运行时会调用此函数,注意也可声明成其它形式*/  
  15.     void *(*process) (void *arg);  
  16.     void *arg;/*回调函数的参数*/  
  17.     struct worker *next;  
  18.   
  19. } CThread_worker;  
  20.   
  21.   
  22.   
  23. /*线程池结构*/  
  24. typedef struct  
  25. {  
  26.     pthread_mutex_t queue_lock;  
  27.     pthread_cond_t queue_ready;  
  28.   
  29.     /*链表结构,线程池中所有等待任务*/  
  30.     CThread_worker *queue_head;  
  31.   
  32.     /*是否销毁线程池*/  
  33.     int shutdown;  
  34.     pthread_t *threadid;  
  35.     /*线程池中允许的活动线程数目*/  
  36.     int max_thread_num;  
  37.     /*当前等待队列的任务数目*/  
  38.     int cur_queue_size;  
  39.   
  40. } CThread_pool;  
  41.   
  42.   
  43.   
  44. int pool_add_worker (void *(*process) (void *arg), void *arg);  
  45. void *thread_routine (void *arg);  
  46.   
  47.   
  48. //share resource  
  49. static CThread_pool *pool = NULL;  
  50. void  
  51. pool_init (int max_thread_num)  
  52. {  
  53.     pool = (CThread_pool *) malloc (sizeof (CThread_pool));  
  54.   
  55.     pthread_mutex_init (&(pool->queue_lock), NULL);  
  56.     pthread_cond_init (&(pool->queue_ready), NULL);  
  57.   
  58.     pool->queue_head = NULL;  
  59.   
  60.     pool->max_thread_num = max_thread_num;  
  61.     pool->cur_queue_size = 0;  
  62.   
  63.     pool->shutdown = 0;  
  64.   
  65.     pool->threadid = (pthread_t *) malloc (max_thread_num * sizeof (pthread_t));  
  66.     int i = 0;  
  67.     for (i = 0; i < max_thread_num; i++)  
  68.     {   
  69.         pthread_create (&(pool->threadid[i]), NULL, thread_routine,NULL);  
  70.     }  
  71. }  
  72.   
  73.   
  74.   
  75. /*向线程池中加入任务*/  
  76. int  
  77. pool_add_worker (void *(*process) (void *arg), void *arg)  
  78. {  
  79.     /*构造一个新任务*/  
  80.     CThread_worker *newworker = (CThread_worker *) malloc (sizeof (CThread_worker));  
  81.     newworker->process = process;  
  82.     newworker->arg = arg;  
  83.     newworker->next = NULL;/*别忘置空*/  
  84.   
  85.     pthread_mutex_lock (&(pool->queue_lock));  
  86.     /*将任务加入到等待队列中*/  
  87.     CThread_worker *member = pool->queue_head;  
  88.     if (member != NULL)  
  89.     {  
  90.         while (member->next != NULL)  
  91.             member = member->next;  
  92.         member->next = newworker;  
  93.     }  
  94.     else  
  95.     {  
  96.         pool->queue_head = newworker;  
  97.     }  
  98.   
  99.     assert (pool->queue_head != NULL);  
  100.   
  101.     pool->cur_queue_size++;  
  102.     pthread_mutex_unlock (&(pool->queue_lock));  
  103.     /*好了,等待队列中有任务了,唤醒一个等待线程; 
  104.     注意如果所有线程都在忙碌,这句没有任何作用*/  
  105.     pthread_cond_signal (&(pool->queue_ready));  
  106.     return 0;  
  107. }  
  108.   
  109.   
  110.   
  111. /*销毁线程池,等待队列中的任务不会再被执行,但是正在运行的线程会一直 
  112. 把任务运行完后再退出*/  
  113. int  
  114. pool_destroy ()  
  115. {  
  116.     if (pool->shutdown)  
  117.         return -1;/*防止两次调用*/  
  118.     pool->shutdown = 1;  
  119.   
  120.     /*唤醒所有等待线程,线程池要销毁了*/  
  121.     pthread_cond_broadcast (&(pool->queue_ready));  
  122.   
  123.     /*阻塞等待线程退出,否则就成僵尸了*/  
  124.     int i;  
  125.     for (i = 0; i < pool->max_thread_num; i++)  
  126.         pthread_join (pool->threadid[i], NULL);  
  127.     free (pool->threadid);  
  128.   
  129.     /*销毁等待队列*/  
  130.     CThread_worker *head = NULL;  
  131.     while (pool->queue_head != NULL)  
  132.     {  
  133.         head = pool->queue_head;  
  134.         pool->queue_head = pool->queue_head->next;  
  135.         free (head);  
  136.     }  
  137.     /*条件变量和互斥量也别忘了销毁*/  
  138.     pthread_mutex_destroy(&(pool->queue_lock));  
  139.     pthread_cond_destroy(&(pool->queue_ready));  
  140.       
  141.     free (pool);  
  142.     /*销毁后指针置空是个好习惯*/  
  143.     pool=NULL;  
  144.     return 0;  
  145. }  
  146.   
  147.   
  148.   
  149. void *  
  150. thread_routine (void *arg)  
  151. {  
  152.     printf ("starting thread 0x%x\n", pthread_self ());  
  153.     while (1)  
  154.     {  
  155.         pthread_mutex_lock (&(pool->queue_lock));  
  156.         /*如果等待队列为0并且不销毁线程池,则处于阻塞状态; 注意 
  157.         pthread_cond_wait是一个原子操作,等待前会解锁,唤醒后会加锁*/  
  158.         while (pool->cur_queue_size == 0 && !pool->shutdown)  
  159.         {  
  160.             printf ("thread 0x%x is waiting\n", pthread_self ());  
  161.             pthread_cond_wait (&(pool->queue_ready), &(pool->queue_lock));  
  162.         }  
  163.   
  164.         /*线程池要销毁了*/  
  165.         if (pool->shutdown)  
  166.         {  
  167.             /*遇到break,continue,return等跳转语句,千万不要忘记先解锁*/  
  168.             pthread_mutex_unlock (&(pool->queue_lock));  
  169.             printf ("thread 0x%x will exit\n", pthread_self ());  
  170.             pthread_exit (NULL);  
  171.         }  
  172.   
  173.         printf ("thread 0x%x is starting to work\n", pthread_self ());  
  174.   
  175.         /*assert是调试的好帮手*/  
  176.         assert (pool->cur_queue_size != 0);  
  177.         assert (pool->queue_head != NULL);  
  178.           
  179.         /*等待队列长度减去1,并取出链表中的头元素*/  
  180.         pool->cur_queue_size--;  
  181.         CThread_worker *worker = pool->queue_head;  
  182.         pool->queue_head = worker->next;  
  183.         pthread_mutex_unlock (&(pool->queue_lock));  
  184.   
  185.         /*调用回调函数,执行任务*/  
  186.         (*(worker->process)) (worker->arg);  
  187.         free (worker);  
  188.         worker = NULL;  
  189.     }  
  190.     /*这一句应该是不可达的*/  
  191.     pthread_exit (NULL);  
  192. }  
  193.   
  194. //    下面是测试代码  
  195.   
  196. void *  
  197. myprocess (void *arg)  
  198. {  
  199.     printf ("threadid is 0x%x, working on task %d\n", pthread_self (),*(int *) arg);  
  200.     sleep (1);/*休息一秒,延长任务的执行时间*/  
  201.     return NULL;  
  202. }  
  203.   
  204. int  
  205. main (int argc, char **argv)  
  206. {  
  207.     pool_init (3);/*线程池中最多三个活动线程*/  
  208.       
  209.     /*连续向池中投入10个任务*/  
  210.     int *workingnum = (int *) malloc (sizeof (int) * 10);  
  211.     int i;  
  212.     for (i = 0; i < 10; i++)  
  213.     {  
  214.         workingnum[i] = i;  
  215.         pool_add_worker (myprocess, &workingnum[i]);  
  216.     }  
  217.     /*等待所有任务完成*/  
  218.     sleep (5);  
  219.     /*销毁线程池*/  
  220.     pool_destroy ();  
  221.   
  222.     free (workingnum);  
  223.     return 0;  
  224. }  


将上述所有代码放入threadpool.c文件中,
在Linux输入编译命令
$ gcc -o threadpool threadpool.c -lpthread

以下是运行结果
starting thread 0xb7df6b90
thread 0xb7df6b90 is waiting
starting thread 0xb75f5b90
thread 0xb75f5b90 is waiting
starting thread 0xb6df4b90
thread 0xb6df4b90 is waiting
thread 0xb7df6b90 is starting to work
threadid is 0xb7df6b90, working on task 0
thread 0xb75f5b90 is starting to work
threadid is 0xb75f5b90, working on task 1
thread 0xb6df4b90 is starting to work
threadid is 0xb6df4b90, working on task 2
thread 0xb7df6b90 is starting to work
threadid is 0xb7df6b90, working on task 3
thread 0xb75f5b90 is starting to work
threadid is 0xb75f5b90, working on task 4
thread 0xb6df4b90 is starting to work
threadid is 0xb6df4b90, working on task 5
thread 0xb7df6b90 is starting to work
threadid is 0xb7df6b90, working on task 6
thread 0xb75f5b90 is starting to work
threadid is 0xb75f5b90, working on task 7
thread 0xb6df4b90 is starting to work
threadid is 0xb6df4b90, working on task 8
thread 0xb7df6b90 is starting to work
threadid is 0xb7df6b90, working on task 9
thread 0xb75f5b90 is waiting
thread 0xb6df4b90 is waiting
thread 0xb7df6b90 is waiting
thread 0xb75f5b90 will exit
thread 0xb6df4b90 will exit
thread 0xb7df6b90 will exit

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/385048.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

二维数组作为函数参数

#include<stdio.h> //#include<> //二位数组作为函数参数时&#xff0c;可以不指定第一个下标 void print_buf(int (*p)[3],int a,int b) //void print_buf(int p[][3],int a,int b) {int i,j;for(i 0 ; i < a; i){for(j 0; j < b; j){printf("p[%…

mystrcat

#include<stdio.h> //如果一个数组做为函数的形参传递&#xff0c;那么数组可以在被调用的函数中修改 //有时候不希望这个事发生&#xff0c;所以对形参采用const参数 //size_t strlen(const char *s); //strcpy(char* s1,const char* s2); void mystrcat(char *s1,cons…

关于非阻塞的recv的时候返回的处理

注意recv&#xff08;&#xff09;如果读到数据为0&#xff0c;那么就表示文件结束了&#xff0c;如果在读的过程中遇到了中断那么会返回-1&#xff0c;同时置errno为EINTR。 因此判断recv的条件&#xff1a; 如果read返回<0 如果0 表示文件结束&…

带参程序

windows环境 #include<stdio.h>int main(int argc, char *argv[]) {printf("argc %d\n", argc);for (int i 0; i < argc; i){printf("argv[%d] %s\n",i, argv[i]);}system("pause");return 0; } windows环境下&#xff0c;带参函数…

Ubuntu安装mysql步骤

1.打开终端&#xff0c;输入&#xff1a; sudo apt-get updata 输入root用户密码 2.更新完毕后&#xff0c;输入 sudo apt-get install mysql-server ubuntu14.04安装中间会让你设置密码&#xff0c;输入密码后点击确认(mysql123) 3.安装结束后&#xff0c;查看端口号是否开启 …

Pthread创建线程后必须使用join或detach释放线程资源

这两天在看Pthread 资料的时候&#xff0c;无意中看到这样一句话(man pthread_detach): Either pthread_join(3) or pthread_detach() should be called for each thread that an application creates, so that system resources for the thread can be released. …

二维数组求平均值(指针的使用)

#include<stdio.h>int main() {int buf[3][5] {{1,2,3,4,5},{4,5,6,7,8},{7,8,9,10,11}};int i;int j;//求行平均值 for(i 0; i < 3; i){int sum 0;for(j 0; j < 5; j){sum (*(*(buf i) j));}printf("sum %d\n",sum/5);}//求列平均值for(i 0; i …

linux终端关闭时为什么会导致在其上启动的进程退出?

现象 经常在linux下开发的人应该都有这样的经验&#xff0c;就是在终端上启动的程序&#xff0c;在关闭终端时&#xff0c;这个程序的进程也被一起关闭了。看下面这个程序&#xff0c;为了使进程永远运行&#xff0c;在输出helloworld后&#xff0c;循环调用sleep&#xff1a; …

二维数组做函数参数传递

#include<stdio.h> //#include<> //二位数组作为函数参数时&#xff0c;可以不指定第一个下标 void print_buf(int (*p)[3],int a,int b) //void print_buf(int p[][3],int a,int b) {int i,j;for(i 0 ; i < a; i){for(j 0; j < b; j){printf("p[%…

libevent源码深度剖析

第一章 1&#xff0c;前言 Libevent是一个轻量级的开源高性能网络库&#xff0c;使用者众多&#xff0c;研究者更甚&#xff0c;相关文章也不少。写这一系列文章的用意在于&#xff0c;一则分享心得&#xff1b;二则对libevent代码和设计思想做系统的、更深层次的分析&#xff…

函数返回指针类型(strchr函数)

#include<stdio.h> #include<string.h> char *mystrchr(char *s,char c) {while(*s){if(*s c){return s;}s;}return NULL; }int main() {char str[100] "hello world";//char* s strchr(str,a);char *s mystrchr(str,e);//返回ello world字符串 prin…

函数与指针

#include<stdio.h>int add(int a,int b) {return ab; }int main() {void *p(int,char *); //声明了一个函数 &#xff0c;函数名为p&#xff0c;函数返回值为void*,函数的 void (*p)(int,char *);//定义了一个指向参数为int和char*返回值为void的函数指针//定义一个参数为…

使用指针在函数中交换数值

#include<stdio.h>void swap(int* a,int *b) {/*int temp *a;*a * b;*b temp;*/*a *b;*b *a - *b;*a *a - *b; }int main() {int a 10;int b 20;swap(&a,&b);printf("a %d,b %d\n",a,b);} 转载于:https://www.cnblogs.com/wanghao-boke/p/1…

linux C 基于链表链的定时器

源码如下&#xff1a;util_timer.h#ifndef LST_TIMER#define LST_TIMER#include <time.h>#include <sys/time.h>#include <stdlib.h>#include <signal.h>#define BUFFER_SIZE 64struct util_timer;/*struct client_data{sockaddr_in address;int sockf…

libevent学习笔记 一、基础知识

一、libevent是什么libevent是一个轻量级的开源的高性能的事件触发的网络库&#xff0c;适用于windows、linux、bsd等多种平台&#xff0c;内部使用select、epoll、kqueue等系统调用管理事件机制。它被众多的开源项目使用&#xff0c;例如大名鼎鼎的memcached等。特点&#xff…

汉字逆置

在计算机中&#xff0c;一个汉字用无法用1个字节来表示 #include<stdio.h> int main() {char buf[256] "你好";int len 0;while(buf[len]);len--;printf("%d\n",len);// 4一个汉字两个字节 //printf("%p\n",buf);return 0; } 在windows下…

libevent项目分析(一) -- 准备阶段

项目的简介 我理解libevent是一个轻量级的&#xff0c;跨平台高效的&#xff08;C语言实现&#xff09;事件驱动库&#xff0c;类似于ACE项目中的ACE_Reactor&#xff0c;它实现了网络通讯套接口I/O事件&#xff0c;定时器事件&#xff0c;信号事件的监听和事件处理函数回调机制…

混合字符串字符数统计

因为汉字占一个以上字节&#xff0c;如何统计一个既有汉字又有字母的字符串呢&#xff1f; 汉字在计算机中的ASCII是以负数来与其他普通字符的ASCII区分的。 #include<stdio.h> int main() {char buf[256] "你好世界";printf("%d\n",buf[0]); //-60…

清除字符串空格

1.清除字符串中右边的空格 从字符串尾部开始&#xff0c;找到非空格处&#xff0c;将下一个字符置为0即可。 //清除右边空格 #include<stdio.h> int main() {char buf[] "hello world ";int len 0;//calculate the length of stringwhile(buf[len]);le…

浅谈auto_ptr智能指针

引入智能指针&#xff1a;智能指针的实现原理&#xff1a; 资源分配即初始化RAII(Resource Acquisition Is Initialization)&#xff1a; 定义一个类来封装资源的分配和释放&#xff0c;在构造函数完成资源的分配和初始化&#xff0c;在析构函数完成资源的清理&#xff0c;可…