线程池原理及C语言实现线程池

备注:该线程池源码参考自传直播客培训视频配套资料;
源码:https://pan.baidu.com/s/1zWuoE3q0KT5TUjmPKTb1lw 密码:pp42
引言:线程池是一种多线程处理形式,大多用于高并发服务器上,它能合理有效的利用高并发服务器上的线程资源;

在Unix网络编程中,线程与进程用于处理各项分支子功能,我们通常的操作是:接收消息 ==> 消息分类 ==> 线程创建 ==> 传递消息到子线程 ==> 线程分离 ==> 在子线程中执行任务 ==> 任务结束退出;
对大多数小型局域网的通信来说,上述方法足够满足需求;但当我们的通信范围扩大到广域网或大型局域网通信中时,我们将面临大量消息频繁请求服务器;在这种情况下,创建与销毁线程都已经成为一种奢侈的开销,特别对于嵌入式服务器来说更应保证内存资源的合理利用;
因此,线程池技术应运而生;线程池允许一个线程可以多次复用,且每次复用的线程内部的消息处理可以不相同,将创建与销毁的开销省去而不必来一个请求开一个线程;

结构讲解:
线程池是一个抽象的概念,其内部由任务队列,一堆线程,管理者线程组成;
这里写图片描述


我们将以上图为例,实现一个最基础的线程池,接下来将分部分依次讲解;讲解顺序为:1.线程池总体结构 2.线程数组 3.任务队列 4.管理者线程 5.使用线程池接口的例子
一、线程池总体结构

这里讲解线程池在逻辑上的结构体;看下方代码,该结构体threadpool_t中包含线程池状态信息,任务队列信息以及多线程操作中的互斥锁;在任务结构体中包含了一个可以放置多种不同任务函数的函数指针,一个传入该任务函数的void*类型的参数;
注意:在使用时需要将你的消息分类处理函数装入任务的(*function);然后放置到任务队列并通知空闲线程;

线程池状态信息:描述当前线程池的基本信息,如是否开启、最小线程数、最大线程数、存活线程数、忙线程数、待销毁线程数等… …
任务队列信息:描述当前任务队列基本信息,如最大任务数、队列不为满条件变量、队列不为空条件变量等… …
多线程互斥锁:保证在同一时间点上只有一个线程在任务队列中取任务并修改任务队列信息、修改线程池信息;
函数指针:在打包消息阶段,将分类后的消息处理函数放在(*function);
void*类型参数:用于传递消息处理函数需要的信息;

/*任务*/
typedef struct {
   void *(*function)(void *);
   void *arg;
} threadpool_task_t;

/*线程池管理*/
struct threadpool_t{
   pthread_mutex_t lock;                 /* 锁住整个结构体 */
   pthread_mutex_t thread_counter;       /* 用于使用忙线程数时的锁 */
   pthread_cond_t  queue_not_full;       /* 条件变量,任务队列不为满 */
   pthread_cond_t  queue_not_empty;      /* 任务队列不为空 */

   pthread_t *threads;                   /* 存放线程的tid,实际上就是管理了线 数组 */
   pthread_t admin_tid;                  /* 管理者线程tid */
   threadpool_task_t *task_queue;        /* 任务队列 */

   /*线程池信息*/
   int min_thr_num;                      /* 线程池中最小线程数 */
   int max_thr_num;                      /* 线程池中最大线程数 */
   int live_thr_num;                     /* 线程池中存活的线程数 */
   int busy_thr_num;                     /* 忙线程,正在工作的线程 */
   int wait_exit_thr_num;                /* 需要销毁的线程数 */

   /*任务队列信息*/
   int queue_front;                      /* 队头 */
   int queue_rear;                       /* 队尾 */
   int queue_size;

   /* 存在的任务数 */
   int queue_max_size;                   /* 队列能容纳的最大任务数 */
   /*线程池状态*/
   int shutdown;                         /* true为关闭 */
};

   

**/*创建线程池*/**
threadpool_t *
threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
{               /*  最小线程数           最大线程数         最大任务数*/
   int i;
   threadpool_t *pool = NULL;
   do
   {
      /* 线程池空间开辟 */
      if ((pool=(threadpool_t *)malloc(sizeof(threadpool_t))) == NULL)
      {
        printf("malloc threadpool false; \n");
        break;   
      }
      /*信息初始化*/
      pool->min_thr_num = min_thr_num;
      pool->max_thr_num = max_thr_num;
      pool->busy_thr_num = 0;
      pool->live_thr_num = min_thr_num;
      pool->wait_exit_thr_num = 0;
      pool->queue_front = 0;
      pool->queue_rear = 0;
      pool->queue_size = 0;
      pool->queue_max_size = queue_max_size;
      pool->shutdown = false;

      /* 根据最大线程数,给工作线程数组开空间,清0 */
      pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num);
      if (pool->threads == NULL)
      {
         printf("malloc threads false;\n");
         break;
      }
      memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);

      /* 队列开空间 */
      pool->task_queue =
      (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);
      if (pool->task_queue == NULL)
      {
         printf("malloc task queue false;\n");
         break;
      }

      /* 初始化互斥锁和条件变量 */
      if ( pthread_mutex_init(&(pool->lock), NULL) != 0           ||
           pthread_mutex_init(&(pool->thread_counter), NULL) !=0  ||
       pthread_cond_init(&(pool->queue_not_empty), NULL) !=0  ||
       pthread_cond_init(&(pool->queue_not_full), NULL) !=0)
      {
         printf("init lock or cond false;\n");
         break;
      }

      /* 启动min_thr_num个工作线程 */
      for (i=0; i<min_thr_num; i++)
      {
         /* pool指向当前线程池  threadpool_thread函数在后面讲解 */
         pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
         printf("start thread 0x%x... \n", (unsigned int)pool->threads[i]);
      }
      /* 管理者线程 admin_thread函数在后面讲解 */
      pthread_create(&(pool->admin_tid), NULL, admin_thread, (void *)pool);

      return pool;
   } while(0);

   /* 释放pool的空间 */
   threadpool_free(pool);
   return NULL;
}

   

二、线程数组

线程数组实际上是在线程池初始化时开辟的一段存放一堆线程tid的空间,在逻辑上形成一个池,里面放置着提前创建的线程;这段空间中包含了正在工作的线程,等待工作的线程(空闲线程),等待被销毁的线程,申明但没有初始化的线程空间;
这里写图片描述

/*工作线程*/
void *
threadpool_thread(void *threadpool)
{
  threadpool_t *pool = (threadpool_t *)threadpool;
  threadpool_task_t task;

  while (true)
  {
    pthread_mutex_lock(&(pool->lock));

    /* 无任务则阻塞在 “任务队列不为空” 上,有任务则跳出 */
    while ((pool->queue_size == 0) && (!pool->shutdown))
    {
       printf("thread 0x%x is waiting \n", (unsigned int)pthread_self());
       pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));

       /* 判断是否需要清除线程,自杀功能 */
       if (pool->wait_exit_thr_num > 0)
       {
          pool->wait_exit_thr_num--;
          /* 判断线程池中的线程数是否大于最小线程数,是则结束当前线程 */
          if (pool->live_thr_num > pool->min_thr_num)
          {
             printf("thread 0x%x is exiting \n", (unsigned int)pthread_self());
             pool->live_thr_num--;
             pthread_mutex_unlock(&(pool->lock));
             pthread_exit(NULL);//结束线程
          }
       }
    }

    /* 线程池开关状态 */
    if (pool->shutdown) //关闭线程池
    {
       pthread_mutex_unlock(&(pool->lock));
       printf("thread 0x%x is exiting \n", (unsigned int)pthread_self());
       pthread_exit(NULL); //线程自己结束自己
    }

    //否则该线程可以拿出任务
    task.function = pool->task_queue[pool->queue_front].function; //出队操作
    task.arg = pool->task_queue[pool->queue_front].arg;

    pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size;  //环型结构
    pool->queue_size--;

    //通知可以添加新任务
    pthread_cond_broadcast(&(pool->queue_not_full));

    //释放线程锁
    pthread_mutex_unlock(&(pool->lock));

    //执行刚才取出的任务
    printf("thread 0x%x start working \n", (unsigned int)pthread_self());
    pthread_mutex_lock(&(pool->thread_counter));            //锁住忙线程变量
    pool->busy_thr_num++;
    pthread_mutex_unlock(&(pool->thread_counter));

    (*(task.function))(task.arg);                           //执行任务

    //任务结束处理
    printf("thread 0x%x end working \n", (unsigned int)pthread_self());
    pthread_mutex_lock(&(pool->thread_counter));
    pool->busy_thr_num--;
    pthread_mutex_unlock(&(pool->thread_counter));
  }

  pthread_exit(NULL);
}

   

三、任务队列

任务队列的存在形式与线程数组相似;在线程池初始化时根据传入的最大任务数开辟空间;当服务器前方后请求到来后,分类并打包消息成为任务,将任务放入任务队列并通知空闲线程来取;不同之处在于任务队列有明显的先后顺序,先进先出;而线程数组中的线程则是一个竞争关系去拿到互斥锁争取任务;
这里写图片描述

/*向线程池的任务队列中添加一个任务*/
int
threadpool_add_task(threadpool_t *pool, void *(*function)(void *arg), void *arg)
{
   pthread_mutex_lock(&(pool->lock));

   /*如果队列满了,调用wait阻塞*/
   while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown))
      pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));

   /*如果线程池处于关闭状态*/
   if (pool->shutdown)
   {
      pthread_mutex_unlock(&(pool->lock));
      return -1;
   }

   /*清空工作线程的回调函数的参数arg*/
   if (pool->task_queue[pool->queue_rear].arg != NULL)
   {
      free(pool->task_queue[pool->queue_rear].arg);
      pool->task_queue[pool->queue_rear].arg = NULL;
   }

   /*添加任务到任务队列*/
   pool->task_queue[pool->queue_rear].function = function;
   pool->task_queue[pool->queue_rear].arg = arg;
   pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size;  /* 逻辑环  */
   pool->queue_size++;

   /*添加完任务后,队列就不为空了,唤醒线程池中的一个线程*/
   pthread_cond_signal(&(pool->queue_not_empty));
   pthread_mutex_unlock(&(pool->lock));

   return 0;
}

   

四、管理者线程

作为线程池的管理者,该线程的主要功能包括:检查线程池内线程的存活状态,工作状态;负责根据服务器当前的请求状态去动态的增加或删除线程,保证线程池中的线程数量维持在一个合理高效的平衡上;
说到底,它就是一个单独的线程,定时的去检查,根据我们的一个维持平衡算法去增删线程;

/*管理线程*/
void *
admin_thread(void *threadpool)
{
   int i;
   threadpool_t *pool = (threadpool_t *)threadpool;
   while (!pool->shutdown)
   {
      printf("admin -----------------\n");
      sleep(DEFAULT_TIME);                             /*隔一段时间再管理*/
      pthread_mutex_lock(&(pool->lock));               /*加锁*/
      int queue_size = pool->queue_size;               /*任务数*/
      int live_thr_num = pool->live_thr_num;           /*存活的线程数*/
      pthread_mutex_unlock(&(pool->lock));             /*解锁*/

      pthread_mutex_lock(&(pool->thread_counter));
      int busy_thr_num = pool->busy_thr_num;           /*忙线程数*/  
      pthread_mutex_unlock(&(pool->thread_counter));

      printf("admin busy live -%d--%d-\n", busy_thr_num, live_thr_num);
      /*创建新线程 实际任务数量大于 最小正在等待的任务数量,存活线程数小于最大线程数*/
      if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num <= pool->max_thr_num)
      {
         printf("admin add-----------\n");
         pthread_mutex_lock(&(pool->lock));
         int add=0;

         /*一次增加 DEFAULT_THREAD_NUM 个线程*/
         for (i=0; i<pool->max_thr_num && add<DEFAULT_THREAD_NUM
              && pool->live_thr_num < pool->max_thr_num; i++)
         {
            if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i]))
           {
              pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
              add++;
              pool->live_thr_num++;
              printf("new thread -----------------------\n");
           }
         }

         pthread_mutex_unlock(&(pool->lock));
      }

      /*销毁多余的线程 忙线程x2 都小于 存活线程,并且存活的大于最小线程数*/
      if ((busy_thr_num*2) < live_thr_num  &&  live_thr_num > pool->min_thr_num)
      {
         // printf("admin busy --%d--%d----\n", busy_thr_num, live_thr_num);
         /*一次销毁DEFAULT_THREAD_NUM个线程*/
         pthread_mutex_lock(&(pool->lock));
         pool->wait_exit_thr_num = DEFAULT_THREAD_NUM;
         pthread_mutex_unlock(&(pool->lock));

         for (i=0; i<DEFAULT_THREAD_NUM; i++)
        {
           //通知正在处于空闲的线程,自杀
           pthread_cond_signal(&(pool->queue_not_empty));
           printf("admin cler --\n");
        }
      }

   }

   return NULL;


/*线程是否存活*/
int
is_thread_alive(pthread_t tid)
{
   int kill_rc = pthread_kill(tid, 0);     //发送0号信号,测试是否存活
   if (kill_rc == ESRCH)  //线程不存在
   {
      return false;
   }
   return true;
}

   

五、释放

/*释放线程池*/
int
threadpool_free(threadpool_t *pool)
{
   if (pool == NULL)
     return -1;
   if (pool->task_queue)
      free(pool->task_queue);
   if (pool->threads)
   {
      free(pool->threads);
      pthread_mutex_lock(&(pool->lock));               /*先锁住再销毁*/
      pthread_mutex_destroy(&(pool->lock));
      pthread_mutex_lock(&(pool->thread_counter));
      pthread_mutex_destroy(&(pool->thread_counter));
      pthread_cond_destroy(&(pool->queue_not_empty));
      pthread_cond_destroy(&(pool->queue_not_full));
   }
   free(pool);
   pool = NULL;

   return 0;
}

  

/*销毁线程池*/
int
threadpool_destroy(threadpool_t *pool)
{
   int i;
   if (pool == NULL)
   {
     return -1;
   }
   pool->shutdown = true;

   /*销毁管理者线程*/
   pthread_join(pool->admin_tid, NULL);

   //通知所有线程去自杀(在自己领任务的过程中)
   for (i=0; i<pool->live_thr_num; i++)
   {
     pthread_cond_broadcast(&(pool->queue_not_empty));
   }

   /*等待线程结束 先是pthread_exit 然后等待其结束*/
   for (i=0; i<pool->live_thr_num; i++)
   {
     pthread_join(pool->threads[i], NULL);
   }

   threadpool_free(pool);
   return 0;
}

  

六、接口

   /* 线程池初始化,其管理者线程及工作线程都会启动 */
    threadpool_t *thp = threadpool_create(10, 100, 100);
    printf("threadpool init ... ... \n");

   /* 接收到任务后添加 */
   threadpool_add_task(thp, do_work, (void *)p);

   // ... ...

   /* 销毁 */
   threadpool_destroy(thp);

  

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/384931.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

iptables 的mangle表

mangle表的主要功能是根据规则修改数据包的一些标志位&#xff0c;以便其他规则或程序可以利用这种标志对数据包进行过滤或策略路由。 内网的客户机通过Linux主机连入Internet&#xff0c;而Linux主机与Internet连接时有两条线路&#xff0c;它们的网关如图所示。现要求对内网进…

Linux常用命令(一)

history 查看历史命令 ctrlp 向上翻历史纪录 ctrln 向下翻历史纪录 ctrlb 光标向左移动 ctrlf 光标向右移动 ctrla 光标移动到行首 ctrle 光标移动到行尾 ctrlh 删除光标前一个 ctrld 删除光标后一个 ctrlu 删除光标前所有 ctrlL clear命令 清屏 tab键可以补全命令/填充路径…

ip route / ip rule /iptables 配置策略路由

Linux 使用 ip route , ip rule , iptables 配置策略路由 要求192.168.0.100以内的使用 10.0.0.1 网关上网&#xff0c;其他IP使用 20.0.0.1 上网。 首先要在网关服务器上添加一个默认路由&#xff0c;当然这个指向是绝大多数的IP的出口网关。 ip route add default gw 20.0.0.…

iptables:tproxy做透明代理

什么是透明代理 客户端向真实服务器发起连接&#xff0c;代理机冒充服务器与客户端建立连接&#xff0c;并以客户端ip与真实服务器建立连接进行代理转发。因此对于客户端与服务器来说&#xff0c;代理机都是透明的。 如何建立透明代理 本地socket捕获数据包 nat方式 iptables…

编译参数(-D)

程序中可以使用#ifdef来控制输出信息 #include<stdio.h> #define DEBUGint main() {int a 10;int b 20;int sum a b; #ifdef DEBUGprintf("%d %d %d\n",a,b,sum); #endifreturn 0; } 这样在有宏定义DEBGU的时候就会有信息输出 如果注销掉宏定义就不会有输…

libpcap讲解与API接口函数讲解

ibpcap&#xff08;Packet Capture Library&#xff09;&#xff0c;即数据包捕获函数库&#xff0c;是Unix/Linux平台下的网络数据包捕获函数库。它是一个独立于系统的用户层包捕获的API接口&#xff0c;为底层网络监测提供了一个可移植的框架。 一、libpcap工作原理 libpcap…

Linux常用命令(三)

man 查看帮助文档 alias ls : 查看命令是否被封装 echo &#xff1a; 显示字符串到屏幕终端 echo $PATH : 将环境变量打印出来 poweroff&#xff1a;关机 rebot&#xff1a;重启 需要管理员权限 vim是从vi发展过来的文本编辑器 命令模式&#xff1a;打开文件之后默认进入命令模…

浅谈iptables防SYN Flood攻击和CC攻击

何为syn flood攻击&#xff1a; SYN Flood是一种广为人知的DoS&#xff08;拒绝服务攻击&#xff09;是DDoS&#xff08;分布式拒绝服务攻击&#xff09;的方式之一&#xff0c;这是一种利用TCP协议缺陷&#xff0c;发送大量伪造的TCP连接请求&#xff0c;从而使得被攻击方资源…

Linux之静态库

命名规则&#xff1a; lib 库的名字 .a 制作步骤 生成对应.o文件 .c .o 将生成的.o文件打包 ar rcs 静态库的名字&#xff08;libMytest.a&#xff09; 生成的所有的.o 发布和使用静态库&#xff1a; 1&#xff09; 发布静态 2&#xff09; 头文件 文件如下图所示&…

iptables详解和练习

防火墙&#xff0c;其实说白了讲&#xff0c;就是用于实现Linux下访问控制的功能的&#xff0c;它分为硬件的或者软件的防火墙两种。无论是在哪个网络中&#xff0c;防火墙工作的地方一定是在网络的边缘。而我们的任务就是需要去定义到底防火墙如何工作&#xff0c;这就是防火墙…

Linux之动态库

命令规则 lib 名字 .so 制作步骤 1&#xff09;生成与位置无关的代码&#xff08;生成与位置无关的代码&#xff09; 2&#xff09;将.o打包成共享库&#xff08;动态库&#xff09; 发布和使用共享库 动态库运行原理&#xff1a; 生成动态库&#xff1a; gcc -fPIC -c *.c -…

linux下源码安装vsftpd-3.0.2

1&#xff09;在http://vsftpd.beasts.org/网站中查找并下载 vsftpd-3.0.2.tar.gz源码包 2)如果自己的机器上安装有yum可以用yum grouplist | less指令查看以下开发环境&#xff0c;当然这一步不做也行 3&#xff09;拆解源码包 4&#xff09;查看源码包 5&#xff09;编辑…

Linux之GDB调试命令

gdb启动 gdb 程序名 l 查看源代码&#xff08;默认显示十行&#xff09; l 文件名&#xff1a;行数 l 文件名&#xff1a;函数名 添加断点 break 行数 &#xff08;b 也行&#xff09; b 15 if i 15 条件断点 i b 查看断点信息 start 程序执行一步 n 单步调试 s 单步&#xf…

Gdb 调试core文件详解

一&#xff0c;什么是coredump 我们经常听到大家说到程序core掉了&#xff0c;需要定位解决&#xff0c;这里说的大部分是指对应程序由于各种异常或者bug导致在运行过程中异常退出或者中止&#xff0c;并且在满足一定条件下&#xff08;这里为什么说需要满足一定的条件呢&#…

Linux之GDB命令(二)

gdb命令&#xff1a; 前提条件&#xff1a;可执行文件必须包含调试信息 gcc -ggdb 文件名 –启动gdb调试查看代码命令 当前文件&#xff1a; list 行号&#xff08;函数名&#xff09; 指定文件&#xff1a; list 文件名&#xff1a;行号&#xff08;函数名&#x…

Windows下编译openssl库

1、概述 OpenSSL是一个开放源代码的软件库包&#xff0c;它实现了 SSL&#xff08;Secure SocketLayer&#xff09;和 TLS&#xff08;Transport Layer Security&#xff09;协议&#xff0c;所以应用程序可以使用这个包来进行安全通信&#xff0c;避免窃听&#xff0c;同时确…

Makefile规则介绍

Makefile 一个规则 三要素&#xff1a;目标&#xff0c;依赖&#xff0c;命令 目标&#xff1a;依赖命令 1、第一条规则是用来生成终极目标的规则 如果规则中的依赖不存在&#xff0c;向下寻找其他的规则 更新机制&#xff1a;比较的是目标文件和依赖文件的时间 两个函…

windows环境下C语言socket编程

最近由于实验需要&#xff0c;要求写一个c程序与java程序通信的软件&#xff0c;为了测试首先写了一个windows环境下c语言的socket&#xff08;tcp&#xff09;通信程序。 首先socket通信的步骤&#xff1a; 图一 socket通信步骤&#xff08;转载) 图二 三次握手协议&…

进程控制块(PCB)

进程控制块PCB 我们知道&#xff0c;每个进程在内核中都有一个进程控制块&#xff08;PCB&#xff09;来维护进程相关的信息&#xff0c;Linux内核的进程控制块是task_struct结构体。 /usr/src/linux-headers-3.16.0-30/include/linux/sched.h文件中可以查看struct task_struct…

网络层攻击防御

网络层攻击防御 网络层攻击防御主要分为以下三类&#xff1a; TCP类报文攻击防御 UDP类报文攻击防御 ICMP类报文攻击防御 TCP类报文攻击防御 TCP正常的交互过程&#xff1a; 图&#xff1a;TCP正常交互过程 在TCP/IP协议中&#xff0c;TCP协议提供可靠的连接服务&#xff0c…