Nginx线程池源码刨析

Nginx线程池源码刨析

相关的API

int     thread_mutex_create     (pthread_mutex_t *mtx);
int     thread_mutex_destroy    (pthread_mutex_t *mtx);
int     thread_mutex_lock       (pthread_mutex_t *mtx);
int     thread_mutex_unlock     (pthread_mutex_t *mtx);
int     thread_cond_create      (pthread_cond_t *cond);
int     thread_cond_destroy     (pthread_cond_t *cond);
int     thread_cond_signal      (pthread_cond_t *cond);
int     thread_cond_wait        (pthread_cond_t *cond, pthread_mutex_t *mtx);
thread_task_t	*thread_task_alloc(size_t size);//分配任务
int_t           thread_task_post(thread_pool_t *tp, thread_task_t *task);
thread_pool_t*  thread_pool_init();
void            thread_pool_destroy(thread_pool_t *tp);
//一些static函数
static void thread_pool_exit_handler(void *data);
static void *thread_pool_cycle(void *data);
static int_t thread_pool_init_default(thread_pool_t *tpp, char *name);

thread_mutex_destroy()

int thread_mutex_destroy(pthread_mutex_t *mtx)
{int  err;err = pthread_mutex_destroy(mtx);if (err != 0) {fprintf(stderr,"pthread_mutex_destroy() failed, reason: %s\n",strerror(errno));return ERROR;}return OK;
}

thread_mutex_create()

//创建锁且使用互斥量属性
int thread_mutex_create(pthread_mutex_t *mtx)
{int                     err;//pthread_mutexattr_t 是用于线程互斥量属性的数据类型//使用 pthread_mutexattr_t 可以更精确地控制互斥量的行为和特性。//例如,你可以通过设置互斥量属性来指定互斥量的类型(如普通互斥量、递归互斥量等)以及是否可跨进程共享。pthread_mutexattr_t     attr;//设置锁的属性err     =   pthread_mutexattr_init(&attr);if (err != 0) {fprintf(stderr, "pthread_mutexattr_init() failed, reason: %s\n",strerror(errno));return ERROR;}//PTHREAD_MUTEX_ERRORCHECK PTHREAD_MUTEX_ERRORCHECK是一个宏,用于设置线程互斥锁的类型为错误检查锁。//线程互斥锁是一种用于保护共享资源的数据结构,可以防止多个线程同时访问同一个资源。//错误检查锁是一种特殊的互斥锁,它在尝试获取锁时,如果锁已经被其他线程持有,//那么它会立即返回错误,而不是等待。/*THREAD_MUTEX_TIME_UP 错误通常表示在等待互斥量时超时了。/这可能是因为互斥量一直处于被其他线程持有的状态,导致当前线程无法获取到互斥量的所有权。/超时发生后,系统会返回这个错误代码,以提示调用者当前无法获取互斥量,并需要适当处理该情况,/例如等待一段时间后重试,或者采取其他的处理方式。*/err     =   pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);//设置属性if (err != 0) {fprintf(stderr, "pthread_mutexattr_settype(PTHREAD_MUTEX_ERRORCHECK) failed, reason: %s\n",strerror(errno));return ERROR;}//初始化互斥量err = pthread_mutex_init(mtx, &attr);if (err != 0) {fprintf(stderr,"pthread_mutex_init() failed, reason: %s\n",strerror(errno));return ERROR;}//销毁互斥量属性err = pthread_mutexattr_destroy(&attr);if (err != 0) {fprintf(stderr,"pthread_mutexattr_destroy() failed, reason: %s\n",strerror(errno));}return OK;
}

thread_mutex_unlock()

int thread_mutex_unlock(pthread_mutex_t *mtx)
{int  err;err = pthread_mutex_unlock(mtx);//解锁#if 0ngx_time_update();
#endifif (err == 0) {return OK;}fprintf(stderr,"pthread_mutex_unlock() failed, reason: %s\n",strerror(errno));return ERROR;
}

thread_mutex_lock()

int thread_mutex_lock(pthread_mutex_t *mtx)
{int  err;err = pthread_mutex_lock(mtx);if (err == 0) {return OK;}fprintf(stderr,"pthread_mutex_lock() failed, reason: %s\n",strerror(errno));return ERROR;
}

thread_cond_wait()

int thread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mtx)
{int  err;//条件变量err = pthread_cond_wait(cond, mtx);if (err == 0) {return OK;}fprintf(stderr, "pthread_cond_wait() failed, reason: %s\n",strerror(errno));return ERROR;
}

thread_cond_destroy()

int thread_cond_destroy(pthread_cond_t *cond)
{int  err;err = pthread_cond_destroy(cond);if (err == 0) {return OK;}fprintf(stderr, "pthread_cond_destroy() failed, reason: %s\n",strerror(errno));return ERROR;
}

thread_cond_signa()

int thread_cond_signal(pthread_cond_t *cond)
{int  err;err = pthread_cond_signal(cond);if (err == 0) {return OK;}fprintf(stderr, "pthread_cond_signal() failed, reason: %s\n",strerror(errno));return ERROR;
}

thread_cond_create()

int thread_cond_create(pthread_cond_t *cond)
{int  err;err = pthread_cond_init(cond, NULL);if (err == 0) {return OK;}fprintf(stderr, "pthread_cond_init() failed, reason: %s\n",strerror(errno));return ERROR;
}

thread_pool_cycle()

函数参数 data 是一个 void 指针,指向一个 thread_pool_t 结构体,该结构体包含了线程池的相关信息。

在函数中,将 data 强制类型转换为 thread_pool_t 结构体指针,并将其赋值给 tp 变量,以便后续操作可以使用线程池的信息。

函数进入一个无限循环 for (;😉 {},这表示线程会持续运行,直到被外部强制终止。

在每次循环迭代中,线程会首先尝试获取线程池的互斥锁 tp->mtx,以确保线程安全地访问线程池的资源。

然后,线程将 tp->waiting 减少,表示线程池中等待执行任务的线程数量减少一个。

接着,线程会进入一个循环,等待任务队列中有任务可执行。如果任务队列为空,线程会等待条件变量 tp->cond,直到有任务被添加到队列中。

一旦有任务可执行,线程会从任务队列中取出第一个任务,并将其移出队列。

然后,线程会释放线程池的互斥锁,允许其他线程访问线程池的资源。

线程会执行任务的处理函数 task->handler(task->ctx),该函数处理任务,并在完成后释放任务所占用的内存空间。

最后,线程会回到循环的开头,继续等待新的任务。


static void *thread_pool_cycle(void *data)
{thread_pool_t *tp = data;int                 err;thread_task_t       *task;//线程任务if(debug)fprintf(stderr,"thread in pool \"%s\" started\n", tp->name);for ( ;; ) {//上锁if (thread_mutex_lock(&tp->mtx) != OK) {return NULL;}tp->waiting--;while (tp->queue.first == NULL) {if (thread_cond_wait(&tp->cond, &tp->mtx)!= OK){(void) thread_mutex_unlock(&tp->mtx);return NULL;}}task = tp->queue.first;tp->queue.first = task->next;if (tp->queue.first == NULL) {tp->queue.last = &tp->queue.first;}if (thread_mutex_unlock(&tp->mtx) != OK) {return NULL;}if(debug) fprintf(stderr,"run task #%lu in thread pool \"%s\"\n",task->id, tp->name);task->handler(task->ctx);if(debug) fprintf(stderr,"complete task #%lu in thread pool \"%s\"\n",task->id, tp->name);task->next = NULL;free(task);}
}

thread_pool_destroy()

void thread_pool_destroy(thread_pool_t *tp)
{uint_t           n;thread_task_t    task;volatile uint_t  lock;//volatile关键字用于修饰变量,表示该变量在多个线程之间是可见的memset(&task,'\0', sizeof(thread_task_t));task.handler = thread_pool_exit_handler;task.ctx = (void *) &lock;for (n = 0; n < tp->threads; n++) {lock = 1;if (thread_task_post(tp, &task) != OK) {return;}while (lock) {//使用sched_yield函数让出CPU,以便其他线程可以执行。sched_yield();}//task.event.active = 0;}(void) thread_cond_destroy(&tp->cond);(void) thread_mutex_destroy(&tp->mtx);free(tp);
}

thread_pool_init()

相关的解释全部在注释里了,这个函数的作用是初始化一个线程池,该函数返回一个thread_pool_t类型的指针

thread_pool_t* thread_pool_init()
{int             err;//错误代号pthread_t       tid;//线程ID/*使用pthread_attr_t可以更精确地控制线程的行为和特性。例如,你可以通过设置线程属性来指定线程的调度策略、优先级,以及分配给线程的栈大小等。*/pthread_attr_t  attr;   //线程属性thread_pool_t   *tp =   NULL;//线程结构体 //分配内存 且 初始化为0//sizeof(thread_pool_t) 表示 thread_pool_t 类型的大小,而 1 表示分配的内存块数量。tp = calloc(1,sizeof(thread_pool_t));//检查分配是否成功if(tp == NULL){fprintf(stderr, "thread_pool_init: calloc failed!\n");}thread_pool_init_default(tp, NULL);//初始化队列thread_pool_queue_init(&tp->queue);//初始化锁及其互斥量属性if (thread_mutex_create(&tp->mtx) != OK) {free(tp);return NULL;}//初始化条件变量if (thread_cond_create(&tp->cond) != OK) {(void) thread_mutex_destroy(&tp->mtx);free(tp);return NULL;}//初始化线程属性err = pthread_attr_init(&attr);if (err) {fprintf(stderr, "pthread_attr_init() failed, reason: %s\n",strerror(errno));free(tp);return NULL;}// 设置线程属性为分离状态//PTHREAD_CREATE_DETACHED 在线程创建是其属性设置为分离状态(detached),主线程使用pthread_join 无法等待道结束的子线程err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);if (err) {fprintf(stderr, "pthread_attr_setdetachstate() failed, reason: %s\n",strerror(errno));free(tp);return NULL;}for (uint_t n = 0; n < tp->threads; n++) {//创建线程后执行函数thread_pool_cycle tp为该函数的参数err = pthread_create(&tid, &attr, thread_pool_cycle, tp);if (err) {fprintf(stderr, "pthread_create() failed, reason: %s\n",strerror(errno));free(tp);return NULL;}}(void) pthread_attr_destroy(&attr);return tp;
}

thread_pool_queue_init()

//初始化为空队列
#define thread_pool_queue_init(q)                                         \(q)->first = NULL;                                                    \(q)->last = &(q)->first

thread_pool_init_default()

static int_t thread_pool_init_default(thread_pool_t *tpp, char *name)
{if(tpp){tpp->threads    = DEFAULT_THREADS_NUM;//线程数设为defaulttpp->max_queue  = DEFAULT_QUEUE_NUM;//队列数设为default -65535(猜猜why 65535//strdup() 函数会在堆上为新字符串分配内存,并将原始字符串的内容复制到这块内存中。//这样做的好处是,你可以在不担心内存越界或释放问题的情况下,安全地操作新字符串。需要注意的是,使用完 strdup() 函数分配的			内存后,你需要在不再需要时手动释放这块内存,以免造成内存泄漏。tpp->name = strdup(name?name:"default"); if(debug)fprintf(stderr,"thread_pool_init, name: %s ,threads: %lu max_queue: %ld\n",pp->name, tpp->threads, tpp->max_queue);return OK;}return ERROR;
}

thread_task_alloc()

一段很香的代码

thread_task_t * thread_task_alloc(size_t size)
{thread_task_t  *task;task = calloc(1,sizeof(thread_task_t) + size);if (task == NULL) {return NULL;}task->ctx = task + 1;//使用柔性数组 在结构体的尾部分批size大小的内存给ctx上下文作为return task;
}

thread_task_post()

/*** @brief 将任务添加到线程池的任务队列中* * @param tp 线程池指针* @param task 待添加的任务指针* @return int_t 成功返回 OK,失败返回 ERROR*/
int_t thread_task_post(thread_pool_t *tp, thread_task_t *task)
{// 获取线程池的互斥锁if (thread_mutex_lock(&tp->mtx) != OK) {return ERROR;}// 检查任务队列是否已满if (tp->waiting >= tp->max_queue) {// 如果队列已满,释放互斥锁并返回错误(void) thread_mutex_unlock(&tp->mtx);fprintf(stderr,"thread pool \"%s\" queue overflow: %ld tasks waiting\n",tp->name, tp->waiting);return ERROR;}// 为任务分配唯一的IDtask->id = thread_pool_task_id++;task->next = NULL;// 发送信号唤醒等待任务的线程if (thread_cond_signal(&tp->cond) != OK) {(void) thread_mutex_unlock(&tp->mtx);return ERROR;}// 将任务添加到队列尾部*tp->queue.last = task;tp->queue.last = &task->next;// 增加等待执行任务的数量tp->waiting++;// 释放线程池的互斥锁(void) thread_mutex_unlock(&tp->mtx);// 打印调试信息if(debug) fprintf(stderr,"task #%lu added to thread pool \"%s\"\n",task->id, tp->name);// 返回操作成功return OK;
}

相关的结构体

typedef unsigned long         atomic_uint_t;
typedef intptr_t       int_t;
typedef uintptr_t      uint_t;
//线程池结构体
typedef struct thread_pool_s_ {pthread_mutex_t        mtx;//锁thread_pool_queue_t   queue;//队列int_t                 waiting;//正在等待的任务数量pthread_cond_t         cond;//condition 信号char                  *name;//线程池的名字uint_t                threads;//线程数量int_t                 max_queue;//最大的队列数量
}thread_pool_t,thread_pool_s_ ;
//任务结构体
typedef struct thread_task_s_ {thread_task_t       *next;//下一个任务的指针uint_t               id;//任务IDvoid                *ctx;//任务上下文 也就是线程执行函数的参数。使用动态分配内存(柔性数组)void               (*handler)(void *data);//线程执行的函数
}thread_task_s,thread_task_t;
//队列结构体
typedef struct thread_pool_queue_t_{thread_task_t        *first;thread_task_t        **last;
} thread_pool_queue_t;

相关宏

#define DEFAULT_THREADS_NUM 4 //默认线程数量
#define DEFAULT_QUEUE_NUM  65535//默认队列数量#define OK             	0
#define ERROR          -1

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/11289.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

「PHP系列」PHP AJAX RSS 阅读器

文章目录 一、AJAX RSS 阅读器1. HTML结构 (index.html)2. PHP处理RSS (rss_fetcher.php)注意事项&#xff1a; 二、AJAX RSS 阅读器运用步骤 1: 设置HTML页面步骤 2: 编写PHP脚本 (rss_fetcher.php)步骤 3: 配置服务器步骤 4: 测试额外提示&#xff1a; 三、相关链接 一、AJAX…

python微信小程序 django+uniapp民宿房屋租赁短租系统

本课题主要基于微信小程序的民宿短租系统的设计&#xff0c;实现了在微信小程序里的民宿房屋的管理系统&#xff0c;系统将房屋信息发布&#xff0c;房屋租赁等功能集于一身&#xff0c;为热爱旅游的用户提供了多种多样的房屋租赁业务&#xff0c;同时也方便了房屋的拥有者发布…

问题解决记录 | 内存溢出

报错截图&#xff1a; 处理方式&#xff1a; 增大PDI工具的内存 打开Spoon.bat配置文件 修改配置

第六节笔记及作业----Lagent AgentLego 智能体应用搭建

关于 Agent 的相关理论 大语言模型存在一些局限性&#xff0c;比如会出现幻觉问题、有时效性问题以及可靠性问题。智能体的定义是具备感知、决策和行动能力的实体。智能体主要由感知部分、大脑部分和动作部分组成。智能体有多种类型&#xff0c;如 ReAct 类型&#xff08;侧重…

落地领域大模型应知必会 (1) :主要微调方法总览

在如今高速发展的人工智能领域&#xff0c;高效地利用大语言模型&#xff08;LLMs&#xff09;已经变得越来越重要。但是&#xff0c;利用大语言模型的方式太多了&#xff0c;如果你才刚刚开始接触它&#xff0c;可能会感到不知所措。 实质上&#xff0c;我们可以通过两种主要…

Github 2024-05-09 Go开源项目日报 Top10

根据Github Trendings的统计,今日(2024-05-09统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量Go项目10Gin Web框架:高性能的Go HTTP框架 创建周期:3496 天开发语言:Go协议类型:MIT LicenseStar数量:73548 个Fork数量:7831 次关注人数…

项目经理必须要学会使用原型图工具或者别的必要工具吗

项目经理不一定必须学会使用原型图工具或其他特定技术工具&#xff0c;但熟悉和掌握一些关键工具和技术无疑会提高他们的工作效率和项目管理能力。以下是关于项目经理是否需要学习使用原型图工具或其他必要工具的几点考虑&#xff1a; 项目需求&#xff1a; 如果项目涉及产品设…

信创应用软件之国产邮箱

信创应用软件之国产邮箱 文章目录 信创应用软件之国产邮箱采用信创邮箱的必要性信创邮箱采购需求国产邮箱业务形态国产邮箱代表性品牌CoremailRichmail安宁eyouUMail拓波 邮件安全的发展阶段 采用信创邮箱的必要性 邮箱是天然的数据存储空间&#xff0c;党政和央国企客户在使用…

软件3班20240513

java.util.PropertyResourceBundle4554617c package com.yanyu;import java.sql.*; import java.util.ResourceBundle;public class JDBCTest01 {public static void main(String[] args) throws SQLException { // 获取属性配置文件ResourceBundle bundle Res…

高效文件管理:一键批量修改文件名,并统一转换为大写扩展名

在现代社会中&#xff0c;无论是个人还是企业&#xff0c;我们都需要处理大量的文件。有效的文件管理不仅能提高我们的工作效率&#xff0c;还能确保数据的完整性和安全性。其中&#xff0c;批量修改文件名和扩展名是一种常用的文件管理方式&#xff0c;本文将详细介绍云炫文件…

双亲委派模型的重要性与作用

1、安全性 双亲委派模型确保了Java核心类库的类型安全。因为所有的类加载请求最终都会传递给顶层的启动类加载器&#xff08;Bootstrap ClassLoader&#xff09;&#xff0c;它负责加载Java的核心类库。这意味着任何自定义的类加载器都不可能加载一个与Java核心类库中的类同名…

[已解决]Linux挂载新硬盘到已有目录下(CentOS7)

首先总结下用到的命令&#xff1a; fdisk -l mount / unmountdf -hT / df -lhmkfs -t ext4 / mkfs -t ext3cp -amv aaa bbb 具体步骤&#xff1a; 挂载物理硬盘/虚拟硬盘到linux主机或者linux虚拟主机上&#xff1b;fdisk -l查看是否成功挂载&#xff1b; fdisk /dev/sdc 初…

牛客周赛 Round 38VP

1.签到&#xff1a;https://ac.nowcoder.com/acm/contest/78292/A AC代码&#xff1a; #include<bits/stdc.h> using namespace std; int x; int main(){cin>>x;int wx%10;if(w0) cout<<0;else cout<<10-w; } 2.签到&#xff1a;https://ac.nowcode…

通过flask搭建,简单的网站,实现注册登录效果,初步了解搭建网页的基本架构。

网站架构了解 通过flask搭建&#xff0c;简单的网站&#xff0c;实现注册登录效果&#xff0c;初步了解搭建网页的基本架构。 前提准备 html在开发中最主要的一些标签知识flask中自带的接收信息给后台的语法 1&#xff09;html基础标签的使用 <h1>用于强调文本内容&…

【Qt】之【CMake】Error : The source.. does not match the soused

QT中cmak编译出现CMake Error: The source… does not match the soused 分析 前提是该项目是从另一个路径的项目复制过来的&#xff0c;编写代码时发现无论怎样修改代码&#xff0c;运行后都没有任何变化&#xff0c;以为是qtbug&#xff0c;重构重启都没用&#xff0c;最后…

websevere服务器从零搭建到上线(三)|IO多路复用小总结和服务器的基础框架

文章目录 epollselect和poll的优缺点epoll的原理以及优势epoll 好的网络服务器设计Reactor模型图解Reactor muduo库的Multiple Reactors模型 epoll select和poll的优缺点 1、单个进程能够监视的文件描述符的数量存在最大限制&#xff0c;通常是1024&#xff0c;当然可以更改数…

Spring底层入门(十一)

1、条件装配 在上一篇中&#xff0c;我们介绍了Spring&#xff0c;Spring MVC常见类的自动装配&#xff0c;在源码中可见许多以Conditional...开头的注解&#xff1a; Conditional 注解是Spring 框架提供的一种条件化装配的机制&#xff0c;它可以根据特定的条件来控制 Bean 的…

【Golang】 在 Gin 框架中添加网关中间件

在 Gin 框架中添加网关中间件是一种常见的做法&#xff0c;可以用于实现请求的预处理、身份验证、日志记录等功能。下面是一个简单的示例&#xff0c;演示如何在 Gin 框架中添加一个简单的网关中间件。 package mainimport ("fmt""net/http""time&qu…

什么是分库分表

读写分离主要应对的是数据库读并发&#xff0c;没有解决数据库存储问题。试想一下&#xff1a;如果 MySQL 一张表的数据量过大怎么办? 答案当然是分库分表 什么是分库&#xff1f; 分库 就是将数据库中的数据分散到不同的数据库上&#xff0c;可以垂直分库&#xff0c;也可…

windows通过sftp对Linux服务器进行上传下载

前言 通过简单高效的方式可以在没有远程连接软件的情况下对服务器进行上传下载。 方法 Windows下打开cmd命令行&#xff0c;输入sftp 用户名IP 上传下载命令 #上传文件&#xff0c;如果需要上传文件夹则 put -r 文件路径 上传到路径 sftp> put E:\clash-verge_1.6.2_a…