前言
本文主要介绍多线程基础知识,以及使用多线程技术进行并发编程;最后会介绍生产者消费者模型;
一、线程基本认识
1、什么是线程
如果你是科班出生,你肯定听过线程相关概念;但是你可能没有真正搞懂什么是线程;在认识线程之前,你得知道什么是进程,进程我们已经前面介绍过了,这里只是简单阐述,进程是操作系统分配资源的基本单位,我们通常将进程相关内核数据结构 + 内存中该程序的代码称作进程,也有好多课本说,进程是程序运行时某一时刻的快照,我们可以理解为进程在运行过程中,其内核数据会不断发生变化的;关于进程相关内核数据,前面我们已经提及,如PCB控制块、页表、进程地址空间、描述当前进程已经打开的文件结构体 files_struct 等;这些都是我们进程的内核数据,代码就更好理解了,这里的代码是我们写的程序经过编译后形成的二进制机器指令;
有了上面概念的铺垫,我们对线程的理解就很简单了,首先我们来看看课本上的概念,线程是处理机调度的基本单位;线程是进程的一部分;听了这些概念肯定还是模模糊糊,这是因为我们无法通过这些生硬的概念来理解;下面我用几张图带着大家逐步理解我们的线程;
上图是我们之前将的进程相关内核数据(Linux),我们学习完进程和文件应该脑海有上面这样的一个整体结构;
关于线程,我们是否也要拿一种数据结构把他描述起来呢?当然是肯定的,我们可能需要保存线程的编号,线程的私有上下文等等信息;我们计算机中当然也不可能只有一个线程,肯定有大量的线程,既然有大量的线程,我们的操作系统是否需要将其组织起来呢?当然也是肯定的,既然要组织起来,肯定要拿某种数据结构将其组织起来;我们的window就是采用上述这种方案,为线程设计一种独立的结构体,然后将其组织起来,而我们的Linux则提出一种更巧妙的思路;
Linux系统下,并没有单独的线程结构体,那么线程又是如何描述组织起来的呢?Linux下,每当我们在当前进程下创建一个线程时,操作系统会为当前进程创建一个PCB控制块,且并不会申请新的虚拟地址空间等内核数据,而是与原来的进程共用一个虚拟地址空间、页表、已打开文件结构体等内核数据结构,如下图所示;
Linux下并不称这些一个又一个新的执行流为线程,而是称作轻量级进程;因此操作系统也不会提供线程相关接口,而是提供轻量级进程相关接口;那我们平常在操作系统课本学的都是线程啊!可并没有什么轻量级进程相关概念,那我们也不会使用这些轻量级进程相关接口啊,我们也不懂怎么使用;此时Linux为我们提供了一个第三方库,这个库中就是对轻量级进程进行包装,包装成我们今天看到的线程;
我们再从概念上理解线程,首先概念上说,线程是处理机调度的基本单位,我们要马上反应过来,我们的CPU是如何看待一个进程的,我们CPU并不关心进程内部是怎么样的,它仅仅是通过PCB控制块来决定待会儿执行什么指令;因此在我们CPU看来,并不认识什么线程、还是进程,它只是通过读取PCB控制块数据,来找到我们的虚拟地址空间、页表等数据,然后找到代码起始地址,然后逐句执行,故我们的线程是处理机调度的基本单位;
线程在进程内,这句话就更好理解了,我们线程只是进程的一部分,它与整个进程共享资源;
2、再次理解进程
很多人就对我们进程的理解就很模糊了,上面一个又一个执行流是进程,而我们之前理解的进程呢?首先,我们之前理解的进程为内部只有一个执行流的进程,今天,我们知道了,进程内部可能有很多执行流,每一个执行流都可称作一个单独的线程,而这些所有线程 + 进程内核数据结构 + 代码 才被称作一个进程,今天我们学习的线程是对进程概念的进一步补充,并不违背我们之前学过的进程概念;
我们还讲过进程是资源分配的基本单位,我们向操作系统申请资源是以进程为基本单位的,当我们创建一个进程时,会创建虚拟地址空间、页表等内核数据,这就是操作系统进行的资源分配,而我们的线程并不会创建这些内核数据,只是创建一个PCB控制块,与我们进程共享这些内核数据罢了;
综上所述,我们不难理解,上面我们的线程是内核给我们提供的轻量级进程的封装,我们在用户层将这个轻量级进程封装起来,这时我们才成这个是线程,那所谓的轻量级,轻量在哪里呢?首先我们进行线程切换时,不需要切换内核数据缓存,比如我们虚拟地址空间、页表等内核数据都会在CPU里的寄存器中缓存起来;而我们的进程切换不同,这些数据肯定是不同的,因此需要进行切换;
3、线程的优缺点
(1)优点
- 创建一个新线程的代价要比创建一个新进程的代价小得到
- 线程切换比进程切换所需操作系统做的事要少很多
- 线程占用资源比进程少
- 提高并发度
(2)缺点
- 缺乏访问控制,某个线程调用某些系统调用可能对整个进程造成影响
- 编写难度提高,调试不易
4、线程的异常
在多线程程序中,若有一个线程因为除零、野指针等问题崩溃会造成整个进程崩溃,也就会引起所有线程退出;
5、线程的数据
关于线程有自己独立数据,也有与其他线程共享数据;线程间通信相比进程容易很多;因为线程共享同一进程地址空间;下面来介绍线程私有数据与共享数据;
私有数据:
线程ID
一组寄存器(寄存器中数据值)
栈
errno
信号屏蔽字
调度优先级
注意:其中一组寄存器强调寄存器里的数据,也就是上下文数据;
共享数据:
文件描述符表
信号处理动作
当前工作目录
用户id与组id
堆
代码段数据
二、线程相关接口
1、线程控制
前面我们提过,Linux并未为我们提供线程相关接口,只是为我们提供了轻量级进程接口,故Linux又为我们用户提供了第三方库的形式来使用线程;这个线程库叫 pthread ,通过我们Linux会自带这个库文件,既然是第三方库文件,我们在编译时必须告诉 gcc/g++ 编译器我们要用的库名称,这里所以要加上 -l 库名 这个选项;
(1)创建线程
关于创建一个线程,接口声明如下所示;
int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine) (void *), void *arg);
参数一:类型为 pthread_t ,是一种自定义类型,我们称这个为线程id,是一个输出型参数;
参数二:此参数为我们线程的属性,我们填NULL表示我们使用默认属性即可;
参数三:此参数为函数指针,线程启动后执行的函数;
参数四:此参数为我们为参数三函数指针传入的参数,无参可填写NULL;
返回值:调用成功返回0,失败返回错误码;
注意:
1、这里返回值特别注意,一般系统调用成功返回0,失败返回-1,错误码被设置,而多线程系类函数不同,后面所有函数返回值没有特别提及都是这样;
2、关于线程参数一返回的线程id我们后面会做详细讲解,这里当作一个线程句柄即可;
3、pthread系类都需要带额外编译选项,-l pthread,因为这是第三方库;
(2)获得自己的线程id
pthread_t pthread_self(void);
该函数无参数,返回值为当前线程的id,该函数总是调用成功的;
(3)线程终止
关于线程终止,我们有三种方案;如下所示;
方案一:调用return语句直接返回主函数,也就是线程终止;
方案二:调用 pthread_exit 函数终止当前线程,具体声明如下所示;
void pthread_exit(void *retval);
参数一:该参数为返回给主线程的返回值,就与子进程返回给父进程的退出码类似;
注意:
1、线程返回的值必须是全局的,或者是malloc等调用在堆上申请的空间,若是栈上的,当线程退出后,线程的栈上的值也随之失效;
2、不可用exit系列函数替代,因为exit是退出进程的函数,一旦调用,该进程下的其他线程也会跟着退出;
方案三:调用 pthread_cancel 函数取消一个正在执行的线程;具体声明如下所示;
int pthread_cancel(pthread_t thread);
参数一:该参数就是我们之前创建线程输出型参数返回的线程id;
注意:该函数是取消一个执行中的线程,调用要确保线程执行中;
(4)线程等待
首先,我们要搞清楚一个问题,我们为什么要进行线程等待;不等待不可以吗?这个问题的答案与我们前面讲解的进程等待的原因是类似的;
1、已经退出的线程,会处于一种类似僵尸状态,我们必须进行回收,不然会造成资源泄漏;
2、我们可以通过线程等待获取线程退出值;
关于线程等待相关接口,声明如下所示;
int pthread_join(pthread_t thread, void **retval);
参数一:参数一为进程的id;
参数二:参数二为一个二级指针,主要接收线程返回的一级指针;、
注意:关于参数二的返回值,有以下几种可能
1、线程调用return语句正常返回,则该参数的值为返回值的地址;
2、线程是因为别的线程调用pthread_cancel而退出的,此时该参数指向的单元存放的是常数PTHREAD_ CANCELED;
3、线程是因为自己调用 pthread_exit 而退出的,此时该参数指向的单元存放的是 pthread_exit 的参数;
4、若对线程的返回值不关心参数二可以直接传入NULL;
(5)线程分离
通常情况下,一个线程默认是joinable的,线程退出后,若不join则会造成资源泄漏;但是我们有时不想关心这个线程,不想join,我们可以将该线程进行分离操作,线程退出后无需等待,自动释放线程资源;具体接口如下所示;
int pthread_detach(pthread_t thread);
该函数只有一个参数,为线程的id,除了其他线程可以调用将该线程分离,自己也可以调用自己我们可以执行以下语句,进行自己分离;
pthread_detach(pthread_self());
2、测试代码
下面为一段多线程测试代码,初步了解多线程使用;(不要跳过,这里讲解线程id)
#include <iostream>
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>#define NUM 3void * threadRoutine(void* args)
{int count = 5;while(count--){printf("分支线程执行中, tid: %u[%p], pid: %d\n", pthread_self(), pthread_self(), getpid());sleep(1);}return nullptr;
}int main()
{// 定义线程句柄pthread_t tid[NUM];// 创建线程for(int i = 0; i < NUM; i++){pthread_create(tid + i, nullptr, threadRoutine, nullptr);}int count = 3;while(count--){std::cout << "main thread, pid: " << getpid() << std::endl;sleep(1); }// 线程等待for(int i = 0; i < NUM; i++)pthread_join(tid[i], nullptr); // 对线程返回结果不关心return 0;
}
注意:编译时一定要加 -l 库文件选项,不然会有以下报错;
当我们加上编译时加上参数后,可以正常编译,如下所示;
运行结果如下图所示;
观察上图,我们不难发现,主进程的pid和分支线程的pid相同,说明子线程在进程内部;运行时,我们还可以在命令行使用 ps -aL | head -1 && ps -aL | grep ./main
确实有四个进程在跑,其中LWP就是线程号,并不是我们之前的 线程id,其中主线程的pid与lwp相同;那么我们前面代码中的线程id是什么呢?
仔细观察上图,我们打印tid更像一个地址;实际上,他就是地址,当我们创建一个线程时,pthread库会在用户层替我们维护一个线程结构;该结构保存每个线程的线程栈,以及线程相关数据;每创建一个线程,就会申请一个线程结构,而这个线程id就是这个结构的起始地址;
三、线程互斥
1、概念补充
临界资源:多线程执行流共享的资源
临界区:访问临界资源的代码
互斥:任何时刻,都只能有一个执行流访问临界资源
原子性:不可被打断,对于一种操作只有两种结果,要么没做,要么已经完成了;我们称这种操作具有原子性;、
可重入函数:多个线程访问这个函数的情况下,不会产生数据不一致等结果;下面这个图来解释关于数据不一致;
稍微解释一下上面四张图,其中图一和图二表示线程一调用链表头插函数,而线程一调用头插函数还没调用完时,此时时间片到了,然后线程二继续执行链表头插函数,线程二很幸运执行完了头插函数,若我们此时线程一继续执行后面的代码,会造成newnode2丢失的场景,我们称这种会造成数据不一致的函数为不可重入函数,而对于不会造成数据不一致的为可重入函数;
2、代码引入
我们使用接下来这段代码来演示多线程下,并发访问共享资源带来的数据不一致问题;
#include <iostream>
#include <pthread.h>
#include <unistd.h>// 定义抢票线程个数
#define NUM 3
// 定义全局变量, 表示票的总数
int tickets = 10000;// 抢票线程执行函数
void* threadRoutine(void* args)
{int num = *(int*)args;while(true){if(tickets > 0){usleep(1000);printf("线程%d get ticket %d\n", num, tickets);tickets--;}else{break;}}
}int main()
{// 1、创建NUM个抢票线程pthread_t tids[NUM];for(int i = 0; i < NUM; i++){pthread_create(tids + i, nullptr, threadRoutine, (void*)(&i));usleep(1000);}// 2、线程等待for(int i = 0; i < NUM; i++){pthread_join(tids[i], nullptr);}return 0;
}
我们用多线程进行抢票操作,可结果却不如我们所料,每次运行的结果都不一样,且居然还出现了负数的票;
为什么会造成上面的现象呢?就是由于我们的抢票函数是一个不可重入函数,而我们用多个线程同时调用,当然有可能产生数据不一致的问题;下面我来演示一下是如何产生数据不一致的问题的;
假设t1线程先得到了调度,将我们的数据进行了一次减减,也就是如上的三个小步骤;接着当我们t1刚将 9999 放入CPU寄存器中并进行-1计算后,发生了调度,t1线程进行现场保护后,被调出了内存,换t2线程执行;
t2开始执行自己得代码,也是先将自己得ticket放入CPU寄存器,然后减减,然后放回内存;如下所示;
此时其实已经就出现问题了,已经有两张9998得票了;接着继续,t2线程比较幸运,将我们得9998减到了5000,当t2线程将5000从内存取到寄存器,并完成减减操作后,突然又发生了调度,t2要进行现场保护,将自己得4999也要保存起来;如下所示;
t1运行后,将自己上下文数据放回寄存器中,从上次中断处的下一行代码接着执行,将自己的数据放回内存中,如下所示;
好家伙,t2线程好不容易将数据减到5000,结果t1线程一下就改到了9998,类似这种问题引发了上面我们看到的现象,因此为了防止这种现象,我们得使这些访问共享资源的进程互斥的进行访问;
3、接口介绍
关于线程互斥,我们是通过锁来完成的,也称互斥量,我们最常用的锁就是互斥锁;互斥锁就像一把钥匙,我们在访问临界资源前,也就是进入临界区前,我们为临界区加锁,出临界区前我们要解锁;用下面这个例子来介绍互斥锁;
你们学习有一个VIP影院,这个影院呢,只有一个座位,因此每次只能进去一个人,钥匙放在影院门口,每个人想要进入这个VIP影院得拿到钥匙,规定必须有钥匙才可以进入影院,如果出影院前,必须要把钥匙挂到门口墙上;因此有很多人一大早就来抢钥匙,没有抢到的就在门口待着;
上述故事模型中,影院座位我们就可以看作临界资源,而门口那个钥匙便是我们得互斥量,在上述规则体系下,任何时刻都只能有一个人能使用VIP影院,也就是只有一个线程能访问临界资源,且出临界区前必须归还钥匙,也就是归还互斥量;我们把进临界区那个动作称为加锁,出临界区那个动作称为解锁;
(1)互斥量初始化与销毁
互斥量的类型为 pthread_mutex_t,我们要对其初始化有两种方案;
1、若互斥量为全局变量
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
2、若为局部变量
pthread_mutex_t mtx;
pthread_mutex_init(&mtx, nullptr);
其中初始化函数有两个参数,第一个参数是互斥量(锁)的地址,第二个变量是所得属性,我们通常设置成NULL,表示默认属性;(返回值还是依照上面pthread系列返回值规则)
通常我们局部变量的锁我们在使用完毕后,我们要对其进行销毁动作;我们需要调用上图中,pthread_mutex_destroy接口;其中参数为锁的地址;
(2)互斥量的加锁与解锁
如下图所示,其中前两个是加锁函数,后面那个是解锁函数,三个函数的参数都为锁的地址;
这里我们介绍一下,第二个 pthread_mutex_trylock ,这个函数若没有申请到锁资源则立即返回一个非零错误码,若申请成功则返回0;而我们的 pthread_mutex_lock 函数,若没有申请到锁则会阻塞,直至能申请到锁资源为止;
4、代码纠正
有了互斥量,我们就可以通过互斥量的方案来实现线程互斥;来保证不会出现数据不一致的问题;如下所示;
#include <iostream>
#include <pthread.h>
#include <unistd.h>// 定义抢票线程个数
#define NUM 3
// 定义全局变量, 表示票的总数
int tickets = 10000;// 定义全局变量锁
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;// 抢票线程执行函数
void* threadRoutine(void* args)
{int num = *(int*)args;while(true){// 加锁pthread_mutex_lock(&mtx);if(tickets > 0){usleep(1000);printf("线程%d get ticket %d\n", num, tickets);tickets--;pthread_mutex_unlock(&mtx); // 解锁}else{pthread_mutex_unlock(&mtx); // 解锁break;}}
}int main()
{// 1、创建NUM个抢票线程pthread_t tids[NUM];for(int i = 0; i < NUM; i++){pthread_create(tids + i, nullptr, threadRoutine, (void*)(&i));usleep(1000);}// 2、线程等待for(int i = 0; i < NUM; i++){pthread_join(tids[i], nullptr);}return 0;
}
注意,上面代码中,我们写了两个解锁,这是因为确保我们使用完临济资源后一定要进行解锁;而在上述代码中,执行流可能执行if语句,也可能执行else语句,所以这里两个语句块里都必须要有解锁动作;运行结果如下;
这时有小伙伴又有疑问了,那为什么不直接放到while循环外面,这样不就不用搞这么麻烦了吗?
我们首先要搞清楚,要是我们放在了while循环外面,我们一旦进入临界区,就是整个while循环结束才会出来,而出while循环的条件是票被抢完了,那么这么做就只有一个线程抢票了,其他线程都申请不到所资源,这样显然是不合理的!
但实际上,我们这代码还是有一点不合理的地方,我们发现有时候总是只有一个线程申请得到锁,关于这个问题后面我们介绍线程同步时再讲解;
5、互斥量原理
互斥量(锁)的实现原理主要是利用一条汇编指令来实现的,首先我们要清楚一条汇编指令一定是原子的;其伪代码如下图;
假设此时有两个线程,我们分别称为a与b,在访问临界资源前都需要加锁;我们假设一种场景;a先执行lock第一句汇编,接着被切走,运行b,b执行前两条,接着又被切走,运行a,此时是什么样的呢?
因此无论如何切换,只有一个1,只有持有1的线程归还,将mutex值设置为1,此时才算归还锁资源,别的线程才可以继续竞争,当然自己也可以在继续竞争;
总结:这种由于多个线程共同访问共享资源而导致数据不一致的问题,我们也称线程安全问题,我们通常使用锁来解决,当然,还有信号量也可以解决,后面会介绍;
6、死锁
这是由于我们上面加锁方案繁衍出的一种问题,当我们多个执行流相互等待对方资源释放而可能会产生死锁问题;以下面为例;
如上图,线程A先使用共享资源B,再使用共享资源A,线程B反之;要使用共享资源,为了确保数据安全,我们可采用加锁方案,上述有两种共享资源,因此我们访问资源A得申请锁A,申请资源B得申请锁B;若两个线程都顺序执行,线程A得到共享资源B得使用权,并加锁,然后轮到线程B调度,接着线程B也得到共享资源A的使用权,正准备申请共享资源B时,发现此时无论如何申请也不可能成功,因为此时资源B的使用权在线程A身上,而接着调度回线程A时,线程A也阻塞了,因为资源B的使用权在线程B身上,因此两个线程都无法继续向后推进运行,这样便造成了死锁的现象;
死锁的四个必要条件:
1、互斥条件(一个共享资源只能被一个执行流访问)
2、请求与保持(每个执行流都不愿意释放自己锁资源,而请求其他锁资源)
3、不剥夺条件(当前执行流无法剥夺其他执行流的锁资源)
4、循环等待条件(执行流循环等待对方锁资源)
只要破坏上述条件中任意一个都可以解除死锁问题;
四、线程同步
1、概念引入
在上述买票代码运行的结果中,我们发现,经常会产生一个线程买很多票的情况;如下图所示;
面对这种情况,显然时很不合理的;首先面对上面这种现象,我们要搞懂以下几个问题;
为什么会出现这样的现象呢?
首先,我们要清楚的是我们无法确保线程调度的顺序,这是由CPU调度器决定的;而当我们一个线程加锁后,访问临界资源,然后出临界区,释放锁资源,在这之后有没有可能又申请到锁资源了呢?以上面VIP电影院的例子为例;我有没有可能在刚出电影院,然后将锁放在门旁边的时候,突然发现门外这么多人排队,然后我又继续持有锁,打开门,继续进入电影院的观影的情况呢?因为我离钥匙最近,因此我最有概率得到这把钥匙;完全有可能我又再次进入VIP影院继续观看电影;上面我们写的代码也是如此;这种代码并没有错误,只是不合理!
为了解决上述问题,我们引入线程同步的概念,我们让这些执行流相互协调配合,有序的推进执行,而不会出现由于某个执行流因其他进程反复申请释放而导致的饥饿问题;
2、接口介绍
关于实现线程同步,我们通常采用条件变量方案;下面正是介绍条件变量相关接口;
(1)条件变量初始化与销毁
条件变量的类型是 pthread_cond_t ,初始化同样也有两种方案,与我们互斥锁类似;
1、若条件变量为全局变量
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
2、若条件变量为局部变量
pthread_cond_t cond;
pthread_cond_init(&cond, NULL);
此时我们需要对条件变量进行销毁,接口如下;
int pthread_cond_destroy(pthread_cond_t *cond);
其中初始化函数的参数一为条件变量的地址,参数二为条件变量的属性,这里我们常常设置成默认属性,直接填写NULL即可;
销毁条件变量的函数的参数使条件变量的地址;
(2)条件变量的等待
线程等待主要的作用是使当前线程在某个条件变量去排队等待,等待下一次被唤醒;
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
其中参数一为条件变量的地址,参数二为所访问临界资源的互斥锁;可能听到这就有点蒙圈了,不要急,看完后面条件变量的使用会清晰很多,后面我们会加入条件变量的应用场景;
(3)条件变量的唤醒
前面的接口可以使一个线程到指定的条件变量下排队,接下来介绍的函数就是唤醒条件变量下的线程;主要有如下两个接口;
int pthread_cond_broadcast(pthread_cond_t *cond); // 唤醒条件变量下所有进程
int pthread_cond_signal(pthread_cond_t *cond); // 唤醒条件变量下的任一线程
上面函数中参数均为条件变量的地址;
3、代码引入
接下来,我们用两段代码来演示条件变量接口的使用;
#include <iostream>
#include <string>
#include <pthread.h>
#include <unistd.h>// 定义线程数量
#define NUM 3// 定义线程数据
class ThreadDate
{
public:ThreadDate(){}ThreadDate(std::string name, pthread_mutex_t* pmtx, pthread_cond_t* pcond):_name(name),_pmtx(pmtx),_pcond(pcond){}
public:std::string _name;pthread_mutex_t* _pmtx;pthread_cond_t* _pcond;
};// 线程运行函数
void* threadRoutine(void* args)
{ThreadDate* pd = (ThreadDate*)args;while(true){// 加锁,准备进入临界区pthread_mutex_lock(pd->_pmtx);pthread_cond_wait(pd->_pcond, pd->_pmtx);std::cout << pd->_name << "正在运行....." << std::endl;pthread_mutex_unlock(pd->_pmtx);}// 记得delete掉数据,不然会造成内存泄漏delete pd;return nullptr;
}int main()
{// 初始化条件变量pthread_cond_t cond;pthread_cond_init(&cond, nullptr);// 初始化互斥锁pthread_mutex_t mtx;pthread_mutex_init(&mtx, nullptr);// 创建线程pthread_t tids[NUM];for(int i = 0; i < NUM; i++){// 初始化线程数据char name[128];sprintf(name, "thread-%d", i + 1);ThreadDate* pd = new ThreadDate(name, &mtx, &cond);pthread_create(tids + i, nullptr, threadRoutine, pd);}// 唤醒任意线程while(true){// 唤醒该条件变量下的一个线程pthread_cond_signal(&cond);sleep(1);}// 进程等待for(int i = 0; i < NUM; i++){pthread_join(tids[i], nullptr);}// 销毁互斥锁和条件变量pthread_mutex_destroy(&mtx);pthread_cond_destroy(&cond);return 0;
}
我们在主线程随机唤醒等待在该条件变量队列中的任意线程;这有点像我们后面要实现的线程池代码;这里只是带着大家一起使用熟悉一下条件变量,接下来的生产者消费者模型会带着大家一起应用条件变量;
五、生产者消费者模型
1、模型介绍
我以我们日常生活来介绍生产者消费者模型,想必大家平常都有线下购物的经历吧;通常我们去超市进行购物,可以说超市是我们的购物场所,我们各位去购物都有一个身份,那就是消费者,那么是谁给我们提供的商品呢?是超市吗?并不是,是生产商给我们提供的商品,生产商也有一个身份,就是生产者;结合我们生活的例子,我们发现对于一个超市来说,肯定也不止一个生产者,也不止一个消费者;如下图所示;
对于上述模型,我们首先要记住三点;
三个种关系:生产者与消费者、生产者与生产者、消费者与消费者
二种角色:生产者、消费者
一个交易场所:超市
有了如上三点,我们便能实现一个生产者消费者模型;我们着重介绍三种关系;如下
消费者与消费者:互斥关系(竞争)
有时候超市经常会给我们做促销,促销商品一般都是限量的;因此,我们的消费者可能会竞争同一种商品;此时我们便要提供一种策略来限制非法竞争的消费者;因此消费者之间是互斥关系;
生产者与生产者:互斥关系(竞争)
对于超市来说,场所的大小是有限的,且有的为止可能更容易让客户看到,因此生产者们都想要一个好的位置,让自己商品的销量更多,因此我们生产者之间也存在着竞争,既然存在我们就要进行管理,防止恶意竞争,所以我们生产者之间也是互斥的;
生产者与消费者:互斥与同步
对于超市每一个位置来说,商品的生产与消费不是同时,比如我们正在挑选某个位置的商品,生产者就不能往我准备购买的商品的位置填充新的商品,只有我拿走以后,生产者才可以继续填充,这就是两者之间的互斥关系;
假如我们某一天特别想要超市的指定某样商品,而我们去超市的时候,发现卖光了,结果我们第二天来,发现还是每补货,接着第三天、第四天、第五天以此类推,我每天都来,发现都没有,这样显然是不合理的,不仅浪费消费者时间,也占用超市资源,因此我们最好是给超市销售留下电话,等生产者补充货物以后,让销售给我们打电话,通知我们来购物,这样才是一种合理的方案,同样反观生产者,若其生产的货物还没有被消费,也需要等消费者消费以后,然后超市工作人员打电话给生产者,让生产者提供商品,这才是一种有效方案;这也就是两者之间的同步关系(没错,但不合理);
2、基于阻塞队列的生产者消费者模型
我们可以设计一种阻塞队列作为我们的交易场所,我们消费者在消息队列里拿数据,生产者在消息队列里放数据;如下所示;
#include <iostream>
#include <string>
#include <time.h>
#include <unistd.h>
#include "BlockQueue.hpp"// 生产者线程数量
#define PRO_NUM 3
// 消费者线程数量
#define CON_NUM 3// 线程所传参数数据类
class ThreadDate
{
public:ThreadDate(std::string name, BQ::BlockQueue<int>* bq):_name(name),_bq(bq){}
public:std::string _name;BQ::BlockQueue<int>* _bq;
};void* productorRoutine(void* args)
{ThreadDate* pd = (ThreadDate*)args;while(true){// 模拟生产数据int num = rand() % 10; pd->_bq->push(num, pd->_name);//sleep(1);}// 释放堆上申请空间delete pd;return nullptr;
}void* consumerRoutine(void* args)
{ThreadDate* pd = (ThreadDate*)args;while(true){int num; // 接受要消费的数据pd->_bq->pop(&num, pd->_name);sleep(1);}// 释放堆上申请空间delete pd;return nullptr;
}int main()
{// 种下随机数种子srand((unsigned int)time(nullptr));// 初始化阻塞队列BQ::BlockQueue<int>* bq = new BQ::BlockQueue<int>();// 初始化生产者消费者线程pthread_t tids_pro[PRO_NUM];pthread_t tids_con[CON_NUM];for(int i = 0; i < PRO_NUM; i++){// 线程名char name[128];sprintf(name, "thread-productor-%d", i + 1);// 线程所需数据ThreadDate* pd = new ThreadDate(name, bq);pthread_create(tids_pro + i, nullptr, productorRoutine, pd);}for(int i = 0; i < CON_NUM; i++){// 线程名char name[128];sprintf(name, "thread-consumer-%d", i + 1);// 线程所需数据ThreadDate* pd = new ThreadDate(name, bq);pthread_create(tids_con, nullptr, consumerRoutine, pd);}// 等待线程for(int i = 0; i < PRO_NUM; i++){pthread_join(tids_pro[i], nullptr);}for(int i = 0; i < CON_NUM; i++){pthread_join(tids_con[i], nullptr);}// 释放堆上申请内存delete pd;return 0;
}
// BlockQueue.hpp文件
#pragma once
#include <queue>
#include <pthread.h>namespace BQ
{// 阻塞队列默认长度const int default_num = 5;// 阻塞队列template<class T>class BlockQueue{public:BlockQueue(int capacity = default_num):_capacity(capacity){// 初始化互斥锁与条件变量pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_con, nullptr);pthread_cond_init(&_pro, nullptr);}~BlockQueue(){// 销毁锁与条件变量pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_con);pthread_cond_destroy(&_pro);}// 判断阻塞队列是否为空bool isEmpty(){return _bq.size() == 0;}// 判断阻塞队列是否为空bool isFull(){return _bq.size() == _capacity;}// 往阻塞队列放数据void push(const T& data, const std::string& name){// 加锁pthread_mutex_lock(&_mtx);// 如果阻塞队列为满,则无法放数据while(isFull()){// 线程等待,等待唤醒pthread_cond_wait(&_pro, &_mtx);}// 运行到这里阻塞队列一定有存数据的位置_bq.push(data);std::cout << name << " pruduct data -> " << data << std::endl;// 唤醒在消费者线程条件变量的一个线程来消费pthread_cond_signal(&_con);// 解锁pthread_mutex_unlock(&_mtx);}// 从阻塞队列拿数据void pop(T* data, const std::string& name){// 加锁pthread_mutex_lock(&_mtx);while(isEmpty()){// 线程等待,等待唤醒pthread_cond_wait(&_con, &_mtx);}// 运行到这里阻塞队列里一定有数据可以消费*data = _bq.front();_bq.pop();std::cout << name << " consume data -> " << *data << std::endl;// 唤醒一个线程来生产数据pthread_cond_signal(&_pro);// 解锁pthread_mutex_unlock(&_mtx);}private:std::queue<T> _bq; // 阻塞队列int _capacity; // 队列长度pthread_mutex_t _mtx; // 维护互斥关系的锁pthread_cond_t _con; // 消费者条件变量pthread_cond_t _pro; // 生产者条件变量};
}
上述代码中,我们的阻塞队列里放的是整型,我们设计的是模板,因此我们还可以替换成函数指针,表示任务,可以执行一项又一项的任务;代码都有注释,不懂还可以评论区留言;
六、信号量
1、基本概念
信号量:信号量本质就是一个计数器,是实现进程同步与互斥的一种手段;
我们通过初始化给信号量赋予初始值,这个值通常为一个大于等于0的整型,我们每次申请临界资源前,我们首先申请信号量,也就是执行P操作,若信号量大于0,此时不会阻塞,并将信号量的值减一,这个减一是原子的;若我们使用完临界资源,要出临界区,则我们就执行V操作,返还信号量的值,也就是对信号量的值加一,这个操作也是原子的;
2、接口介绍
这里介绍POSIX版本信号量,以及给我们提供的接口;
(1)类型与初始化
同样,我们使用信号量之前需要引入头文件 semaphore.h ,信号量的类型为sem_t,我们使用前需要对其初始化,初始化函数声明如下;
int sem_init(sem_t *sem, int pshared, unsigned int value);
注意:同样,我们需要编译时需要指定库名 pthread;
参数一:上面我们定义信号量的地址,与上面互斥锁、条件变量类似,这是一个输出型参数;
参数二:0表示线程间共享,1表示进程间共享
参数三:信号量的初始值,就是我们上面说的数字,后续会对这个数字加加或减减;
返回值:调用成功返回0,失败返回-1,错误码被设置
(2)销毁信号量
int sem_destroy(sem_t *sem);
参数:信号量的地址
返回值:调用成功返回0,失败返回-1,错误码被设置
(3)信号量的等待
该操作为申请信号量,也就是判断信号量的值是否大于0,若大于另则减减,否则阻塞等待;
int sem_wait(sem_t *sem);
参数:信号量的地址
返回值:调用成功返回0,失败返回-1,错误码被设置
(4)信号量的发布
所谓信号量的发布,就是信号量的归还,使用完临界资源后,返还信号量,对信号量进行加加操作;
int sem_post(sem_t *sem);
参数:信号量的地址
返回值:调用成功返回0,失败返回-1,错误码被设置
3、基于环形队列的生产者消费者模型
环形队列想必大家都不陌生,可以通过顺序表实现,我们仅需维护一个数组数据起始下标和结束下表即可,若对下标进行加加时,我们注意模上数组大小即可;
// Main.cc 文件
#include <iostream>
#include <string>
#include <time.h>
#include <pthread.h>
#include <unistd.h>
#include "Ringqueue.hpp"// 定义消费者线程数量
#define CON_NUM 3
// 定义生产者线程数量
#define PRO_NUM 3// 线程参数
class ThreadDate
{
public:ThreadDate(const std::string name, RQ::RingQueue<int>* rq):_name(name),_rq(rq){}
public:std::string _name;RQ::RingQueue<int>* _rq;};// 生产者执行线程函数
void* productorRoutine(void* args)
{ThreadDate* td = (ThreadDate*)args;while(true){int num = rand() % 10;td->_rq->push(num, td->_name);//sleep(1);}delete td;return nullptr;
}
// 消费者执行线程函数
void* consumerRoutine(void* args)
{ThreadDate* td = (ThreadDate*)args;while(true){int num;td->_rq->pop(&num, td->_name);sleep(1);}delete td;return nullptr;
}int main()
{// 种下随机数种子srand((unsigned int)time(nullptr));// 创建循环队列RQ::RingQueue<int>* rq = new RQ::RingQueue<int>(5);// 创建线程pthread_t tids_pro[PRO_NUM];pthread_t tids_con[CON_NUM];for(int i = 0; i < PRO_NUM; i++){char name[128];sprintf(name, "thread-productor-%d", i + 1);ThreadDate* td = new ThreadDate(name, rq);pthread_create(tids_pro + i, nullptr, productorRoutine, td);}for(int i = 0; i < CON_NUM; i++){char name[128];sprintf(name, "thread-consumer-%d", i + 1);ThreadDate* td = new ThreadDate(name, rq);pthread_create(tids_con + i, nullptr, consumerRoutine, td);}// 线程等待for(int i = 0; i < PRO_NUM; i++){pthread_join(tids_pro[i], nullptr);}for(int i = 0; i < CON_NUM; i++){pthread_join(tids_con[i], nullptr);}// 销毁堆上申请资源delete rq;return 0;
}
// Ringqueue.hpp 文件
#pragma once
#include <vector>
#include <string>
#include <semaphore.h>namespace RQ
{const int default_num = 5;template<class T>class RingQueue{public:RingQueue(int capacity = default_num):_capacity(capacity){// 初始化环形队列_rq.resize(capacity);// 初始化信号量sem_init(&_data_sem, 0, 0); // 刚开始队列里没有数据,故初始化为0sem_init(&_space_sem, 0, _capacity); // 刚开始队列为空,故初始化为capacity// 初始化下标值_start = _end = 0;// 初始化锁pthread_mutex_init(&_c_mtx, nullptr);pthread_mutex_init(&_p_mtx, nullptr);}~RingQueue(){// 销毁信号量sem_destroy(&_data_sem);sem_destroy(&_space_sem);// 销毁锁pthread_mutex_destroy(&_c_mtx);pthread_mutex_destroy(&_p_mtx);}void push(const T& data, const std::string name){// 申请空间信号量P(&_space_sem);// 加锁pthread_mutex_lock(&_p_mtx);_rq[_end++] = data;_end %= _capacity;std::cout << name << " pruduct data -> " << data << std::endl;// 解锁pthread_mutex_unlock(&_p_mtx);// 增加数据信号量V(&_data_sem);}void pop(T* data, const std::string name){// 申请数据信号量P(&_data_sem);// 加锁pthread_mutex_lock(&_c_mtx);// 取走数据*data = _rq[_start++];_start %= _capacity;std::cout << name << " pruduct data -> " << *data << std::endl;// 解锁pthread_mutex_unlock(&_c_mtx);// 增加空间信号量V(&_space_sem);}private:void P(sem_t* sem){sem_wait(sem);}void V(sem_t* sem){sem_post(sem);}private:std::vector<T> _rq; // 环形队列int _capacity;int _start; // 起始下标int _end; // 末尾下标sem_t _data_sem; // 数据信号量sem_t _space_sem; // 空间信号量pthread_mutex_t _c_mtx; // 消费者之间的互斥锁pthread_mutex_t _p_mtx; // 生产者之间的互斥锁};
}
上面所有代码均有注释,若还是不大理解可私信小编,看到会及时回复;
七、线程池
可能大家多少都听说过线程池,不知道也没有关系,所谓线程池,就是利用池化技术来减少我们进程向OS申请空间的次数,以达到提高性能的效果;关于池化技术,我们可以这么理解,比如我们以前在大学问父母讨要生活费时,我们父母可能是一个月给你一次生活费,但也有一种父母,让你在每次需要花钱时,给父母发消息,然后父母在进行转账,而这种转账效率太低了,因此诞生了池化技术,我们一次性让父母给一个月的生活费,也可以理解为我们提前向父母透支这个月的生活费;同样,我们可以把这样的技术用到编码中,我们提前向操作系统申请资源,等我们需要该资源时,我们直接拿着用就行,而无需向操作系统申请;
接下来的线程池也是这种技术,我们有如下需求,我们想要提前创建N个线程,并用一种数据结构组织起来,这里选择顺序结构,也就是C++中的vector,然后我们用主线程模拟派发任务,然后往任务队列放任务,而我们每放一个任务就唤醒一个线程来处理这个任务,如下图所示;
具体代码如下所示,可结合注释观看代码;
// main.cc文件 主线程,用于不断派发任务
#include <iostream>
#include <unistd.h>
#include "Task.hpp"
#include "ThreadPool.hpp"int main()
{// 生成随机数种子srand((unsigned int)time(nullptr));// 初始化线程池ThreadPool<Task>* tp = new ThreadPool<Task>(3);tp->run();// 生产任务while(true){int x = rand() % 10;int y = rand() % 10;Task t(x, y, [](int x, int y){return x + y;});tp->push(t);std::cout << x << "+" << y << "= ?" << std::endl;sleep(1);}return 0;
}
// lockGuard.hpp 文件,采用RAII技术,实现自动解锁(出作用域)
#pragma once
#include <pthread.h>class Mutex
{
public:Mutex(pthread_mutex_t* mtx):_mtx(mtx){}void lock(){pthread_mutex_lock(_mtx);}void unlock(){pthread_mutex_unlock(_mtx);}
private:pthread_mutex_t* _mtx;
};class LockGuard
{
public:LockGuard(pthread_mutex_t* mtx):_mtx(mtx){_mtx.lock();}~LockGuard(){_mtx.unlock();}
private:Mutex _mtx;
};
// Task.hpp 文件,任务类,模拟任务
#pragma once
#include <iostream>
#include <functional>
using call_back = std::function<int(int, int)>;
class Task
{
public:Task(){}Task(int x, int y, call_back cb):_x(x),_y(y),_func(cb){}void operator()(){int ret = _func(_x, _y);std::cout << _x << "+" << _y << "=" << ret << std::endl;}
private:int _x;int _y;call_back _func;
};
// Thread.hpp文件,模拟封装我们自己的线程
#pragma once
#include <cstdio>
#include <string>
#include <functional>
#include <pthread.h>typedef void*(*func_t)(void*);class ThreadDate
{
public:ThreadDate(){}ThreadDate(const std::string name, void* args):_name(name),_args(args){}
public:std::string _name;void* _args;
};class Thread
{
public:Thread(int num, func_t func, void* args):_func(func){char buf[128];sprintf(buf, "thread-%d", num);_td = new ThreadDate(buf, args);}~Thread(){delete _td;}void start(){pthread_create(&_tid, nullptr, _func, _td);}void join(){pthread_join(_tid, nullptr);}
private:std::string _name;pthread_t _tid;func_t _func;ThreadDate* _td;
};
// ThreadPool.hpp文件,线程池(核心代码)
#pragma once
#include <vector>
#include <queue>
#include <pthread.h>
#include <functional>
#include <Thread.hpp>
#include "LockGuard.hpp"const int g_thread_num = 3;
template<class T>
class ThreadPool
{
public:// 构造ThreadPool(int num = g_thread_num):_num(num){// 初始化线程对象数组for(int i = 0; i < num; i++){_threads.push_back(new Thread(i + 1, threadRoutine, this));}// 初始化锁pthread_mutex_init(&_mtx, nullptr);// 初始化条件变量pthread_cond_init(&_cond, nullptr);}// 析构~ThreadPool(){// 线程等待for(auto& it : _threads){it->join();delete it; // 注意我们线程是new出来的,记得释放}// 释放锁资源pthread_mutex_destroy(&_mtx);// 释放条件变量pthread_cond_destroy(&_cond);}// 启动线程池里线程void run(){// 让线程启动for(auto it : _threads){it->start();}}// 往任务队列放数据(相当于生产者)void push(const T& data){LockGuard lock(&_mtx);// 往任务队列塞数据_tasks.push(data);// 唤醒线程来消费数据pthread_cond_signal(&_cond);}
private:// 线程执行的线程函数(这里必须设置成static,如果不设置成static会默认传this指针)// 这个相当于cp模型中的消费者static void* threadRoutine(void* args){// 获取传参ThreadDate* td = (ThreadDate*)args;// 获取当前线程池ThreadPool* tp = (ThreadPool*)td->_args;while(true){Task t;{LockGuard lock(&tp->_mtx);// 判断任务队列是否有任务while(tp->_tasks.empty()) pthread_cond_wait(&tp->_cond, &tp->_mtx);// 执行到这里肯定有任务了t = tp->_tasks.front();tp->_tasks.pop();}// 执行任务t();}}
private:int _num; // 线程数量std::vector<Thread*> _threads; // 保存thread的数组std::queue<T> _tasks; // 任务队列pthread_mutex_t _mtx; // 保护任务队列的互斥锁pthread_cond_t _cond; // 唤醒新线程来消费数据的条件变量
};