多个进程同时访问某些资源时,必须考虑同步问题,以确保任一时刻只有一个进程可以拥有对资源的独占式访问。通常,程序对关键资源的访问代码只是很短的一段,我们称这段代码为关键代码段或者临界区,对进程同步,也就是确保任意时刻只能有一个进程进入关键代码段。
一、信号量
1.1、信号量原语
Dijkstra提出的信号量(Semaphore)概念是并发编程领域迈出的重要一步。对信号的操作只能是P和V,这两个字母来自荷兰语单词,一个是传递,一个是释放。
- P(SV),如果SV的值大于0,就将它减1;如果SV的值为0,则挂起进程的执行。
- V(SV),如果有其他进程因为等待SV而挂起,则唤醒之;如果没有,则将SV加1。
虽然信号量的取值可以是任意自然数,但是最常用的还是二进制信号量。使用二进制信号量同步两个进程,以确保关键代码段的独占式访问的一个典型例子如下:
当关键代码段可用时,二进制信号量SV的值为1,进程A和B都有机会进入关键代码段。如果此时进程A执行了P(SV)操作将SV减1,则进程B若再执行P(SV)操作就会被挂起。直到进程A离开关键代码段,并执行V(SV)操作将SV加1,关键代码段才重新变得可用。如果此时进程B因为等待SV而处于挂起状态,则它将被唤醒,并进入关键代码段。同样,这时进程A如果再执行P(SV)操作,则也只能被操作系统挂起以等待进程B退出关键代码段。
**注意:**使用一个普通变量是不能模拟信号量的,没有一个原子操作能一下子执行两步:检测变量为true还是false,如果是再将他设置为true或者false。
在作者写的手写操作系统的信号量实现部分,作者是使用关中断以确保原子操作的。但是关中断的代价很高…
1.2、semget系统调用
semget系统调用创建一个新的信号量集,或者获取一个已经存在的信号量集:
#include <sys/sem.h>int semget(key_t key, int num_sems, int sem_flags);
key
:用于标识信号量集的键值(key)。不同的信号量集需要不同的键值,相同的键值会返回同一个信号量集的标识符。num_sems
:信号量集中包含的信号量数量。sem_flags
:标志位,用于指定操作的行为。它低端的9个比特是该信号量的权限。还可以和IPC_CREAT
结合来创建一个新的信号量集,或者和IPC_EXCL
结合使用来确保只有当前进程创建了信号量集。
semget
成功时返回一个正整数值,它是信号量集的标识符;semget失败时返回-1,并设置errno。
如果semget用于创建信号量集,则与之关联的内核数据结构体semid_ds
将被创建并初始化。semid_ds
结构体的定义如下:
#include <sys/sem.h>/*该结构体用于描述IPC对象(信号量、共享内存和消息队列)的权限*/
struct ipc_perm {key_t key;/*键值*/uid_t uid;/*所有者的有效用户ID*/gid_t gid;/*所有者的有效组ID*/uid_t cuid;/*创建者的有效用户ID*/gid_t cgid;/*创建者的有效组ID*/mode_t mode;/*访问权限*//*省略其他填充字段*/
}struct semid_ds {struct ipc_perm sem_perm;/*信号量的操作权限*/unsigned long int sem_nsems;/*该信号量集中的信号量数目*/time_t sem_otime;/*最后一次调用semop的时间*/time_t sem_ctime;/*最后一次调用semctl的时间*//*省略其他填充字段*/
};
1.3、semop系统调用
semop系统调用改变信号量的值,即执行P、V操作。先看一下与信号量关联的内核变量
unsigned short semval; /*信号量的值*/
unsigned short semzcnt;/*等待信号量值变为0的进程数量*/
unsigned short semncnt;/*等待信号量值增加的进程数量*/
pid_t sempid; /*最后一次执行semop操作的进程ID*/
semop系统调用如下
#include <sys/sem.h>int semop(int sem_id, struct sembuf* sops, size_t nsops);
-
sem_id
:要操作的信号量集的标识符。 -
nsops
:要执行的操作数量,即sops
数组中结构体的数量。 -
sops
:一个指向结构体数组的指针,每个结构体描述了对信号量的一个操作。结构体类型为struct sembuf
,定义如下 -
struct sembuf {unsigned short sem_num; // 信号量的索引short sem_op; // 操作类型:P 操作为 -1,V 操作为 1short sem_flg; // 标志位,通常设置为 0};
-
sem_flg的取值为IPC_NOWAIT和SEM_UNDO。IPC_NOWAIT的含义是,无论信号量操作是否成功,semop调用都将立即返回,这类似于非阻塞I/O操作。SEM_UNDO的含义是,当进程退出时取消正在进行的semop操作。
1.4、semctl系统调用
semctl系统调用允许调用者对信号量进行直接控制。其定义如下:
#include <sys/sem.h>int semctl(int sem_id, int sem_num, int cmd, ...);
参数说明:
-
sem_id
:要操作的信号量集的标识符。 -
sem_num
:被操作的信号量在信号量集中的编号,通常为 0。 -
cmd
:指定要执行的操作类型,可以是以下几种之一:GETVAL
:获取信号量的当前值。SETVAL
:设置信号量的值。GETPID
:获取上一个执行操作的进程的 PID。GETNCNT
:获取正在等待信号量值增加的进程数量。GETZCNT
:获取正在等待信号量值减少到 0 的进程数量。IPC_RMID
:删除信号量集。
-
...
:根据不同的命令类型,可以接受额外的参数。但是有一个推荐格式 -
union semun {int val;/*用于SETVAL命令*/struct semid_ds*buf;/*用于IPC_STAT和IPC_SET命令*/unsigned short*array;/*用于GETALL和SETALL命令*/struct seminfo*__buf;/*用于IPC_INFO命令*/};struct seminfo {int semmap;/*Linux内核没有使用*/int semmni;/*系统最多可以拥有的信号量集数目*/int semmns;/*系统最多可以拥有的信号量数目*/int semmnu;/*Linux内核没有使用*/int semmsl;/*一个信号量集最多允许包含的信号量数目*/int semopm;/*semop一次最多能执行的sem_op操作数目*/int semume;/*Linux内核没有使用*/int semusz;/*sem_undo结构体的大小*/int semvmx;/*最大允许的信号量值*//*最多允许的UNDO次数(带SEM_UNDO标志的semop操作的次数)*/int semaem;}
semctl()
函数成功时返回一个非负整数,具体取决于所执行的命令。失败时返回 -1,并设置相应的错误码。
这些操作中,GETNCNT
、GETPID
、GETVAL
、GETZCNT
和SETVAL
操作的是单个信号量,它是由标识符sem_id
指定的信号量集中的第sem_num
个信号量;而其他操作针对的是整个信号量集,此时semctl
的参数sem_num
被忽略。
1.5、特殊键值IPC_PRIVATE
semget的调用者可以给其key参数传递一个特殊的键值IPC_PRIVATE(其值为0),这样无论该信号量是否已经存在,semget都将创建一个新的信号量。使用该键值创建的信号量并非像它的名字声称的那样是进程私有的。其他进程,尤其是子进程,也有方法来访问这个信号量。
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/sem.h>
#include <sys/wait.h>union semun {int val;struct semid_ds *buf;unsigned short int *array;struct seminfo *__buf;
};/* P 操作和 V 操作函数 */
void pv(int sem_id, int op) {struct sembuf sem_b;sem_b.sem_num = 0;sem_b.sem_op = op;sem_b.sem_flg = SEM_UNDO;semop(sem_id, &sem_b, 1);
}int main(int argc, char *argv[]) {int sem_id = semget(IPC_PRIVATE, 1, 0666); // 创建一个包含一个信号量的信号量集if (sem_id == -1) {perror("semget");return 1;}union semun sem_un;sem_un.val = 1; // 设置信号量的初始值为 1semctl(sem_id, 0, SETVAL, sem_un); // 初始化信号量的值pid_t id = fork(); // 创建子进程if (id < 0) {perror("fork");return 1;} else if (id == 0) { // 子进程printf("Child process tries to acquire the binary semaphore.\n");pv(sem_id, -1); // 执行 P 操作printf("Child process acquired the semaphore and will release it after 5 seconds.\n");sleep(5);pv(sem_id, 1); // 执行 V 操作exit(0);} else { // 父进程printf("Parent process tries to acquire the binary semaphore.\n");pv(sem_id, -1); // 执行 P 操作printf("Parent process acquired the semaphore and will release it after 5 seconds.\n");sleep(5);pv(sem_id, 1); // 执行 V 操作}waitpid(id, NULL, 0); // 等待子进程结束semctl(sem_id, 0, IPC_RMID, sem_un); // 删除信号量集return 0;
}
二、互斥锁
互斥锁是线程同步最常用的一种方式,通过互斥锁可以锁定一个代码块, 被锁定的这个代码块, 所有的线程只能顺序执行(不能并行处理),这样多线程访问共享资源数据混乱的问题就可以被解决了,需要付出的代价就是执行效率的降低,因为默认临界区多个线程是可以并行处理的,现在只能串行处理。
互斥锁可以看做是信号量的进一步封装。
在Linux中互斥锁的类型为pthread_mutex_t
,创建一个这种类型的变量就得到了一把互斥锁:
pthread_mutex_t mutex;
在创建的锁对象中保存了当前这把锁的状态信息:锁定还是打开,如果是锁定状态还记录了给这把锁加锁的线程信息(线程ID)。一个互斥锁变量只能被一个线程锁定,被锁定之后其他线程再对互斥锁变量加锁就会被阻塞,直到这把互斥锁被解锁,被阻塞的线程才能被解除阻塞。一般情况下,每一个共享资源对应一个把互斥锁,锁的个数和线程的个数无关。
2.1、初始化锁
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
mutex
是一个指向互斥锁对象的指针,用于指定要初始化的互斥锁。attr
是一个指向互斥锁属性对象的指针,用于指定互斥锁的属性。如果将attr
参数设置为NULL
,则使用默认的互斥锁属性。
2.2、加锁
int pthread_mutex_lock(pthread_mutex_t *mutex);
这个函数被调用, 首先会判断参数 mutex 互斥锁中的状态是不是锁定状态:
- 没有被锁定, 是打开的, 这个线程可以加锁成功, 这个这个锁中会记录是哪个线程加锁成功了
- 如果被锁定了, 其他线程加锁就失败了, 这些线程都会阻塞在这把锁上
- 当这把锁被解开之后, 这些阻塞在锁上的线程就解除阻塞了,并且这些线程是通过竞争的方式对这把锁加锁,没抢到锁的线程继续阻塞
2.3、尝试加锁
int pthread_mutex_trylock(pthread_mutex_t *mutex);
调用这个函数对互斥锁变量加锁还是有两种情况:
- 如果这把锁没有被锁定是打开的,线程加锁成功
- 如果锁变量被锁住了,调用这个函数加锁的线程,不会被阻塞,加锁失败直接返回错误号,相当于加锁的非阻塞版本
2.4、解锁
int pthread_mutex_unlock(pthread_mutex_t *mutex);
不是所有的线程都可以对互斥锁解锁,哪个线程加的锁, 哪个线程才能解锁成功。
2.5、释放锁
int pthread_mutex_destroy(pthread_mutex_t *mutex);
释放锁和解锁不一样,解锁的话锁还在,只是解开了,释放锁是将锁占的系统资源释放,锁就不存在了。
2.6、互斥锁使用
两个进程共享一个计数器,使用互斥锁来保护对计数器的访问
#include <stdio.h>
#include <pthread.h>// 创建互斥锁全局变量
pthread_mutex_t mutex_counter;
// 创建计数器
int counter = 0;// 线程函数,对计数器进行增加操作
void *thread_function(void *arg) {for (int i=0; i<10000; i++) {// 获取互斥锁pthread_mutex_lock(&mutex_counter);// 对共享资源进行操作counter++;// 释放互斥锁pthread_mutex_unlock(&mutex_counter);}return NULL;
}int main() {// 线程pthread_t threads[2];// 初始化线程锁pthread_mutex_init(&mutex_counter, NULL);// 创建线程for (int i=0;i<2;i++) {pthread_create(&threads[i], NULL, thread_function, NULL);}// 等待线程结束for (int i=0;i<2;i++) {pthread_join(threads[i],NULL);}// 释放锁pthread_mutex_destroy(&mutex_counter);// 输出计数值printf("counter = %d",counter);return 0;
}
三、死锁
当多个线程访问共享资源, 需要加锁, 如果锁使用不当, 就会造成死锁这种现象。如果线程死锁造成的后果是:所有的线程都被阻塞,并且线程的阻塞是无法解开的(因为可以解锁的线程也被阻塞了)。
3.1、加锁后不解锁
// 线程函数,对计数器进行增加操作
void *thread_function(void *arg) {for (int i=0; i<10000; i++) {// 获取互斥锁pthread_mutex_lock(&mutex_counter);// 对共享资源进行操作counter++;}return NULL;
}
那么只能有一个线程拥有锁,且这个线程退出后,也没有线程能拿到锁
3.2、重复加锁
// 线程函数,对计数器进行增加操作
void *thread_function(void *arg) {for (int i=0; i<10000; i++) {// 获取互斥锁pthread_mutex_lock(&mutex_counter);pthread_mutex_lock(&mutex_counter);// 对共享资源进行操作counter++;// 释放互斥锁pthread_mutex_unlock(&mutex_counter);}return NULL;
}
第二次加锁的时候,发现已经加锁了,当前线程被阻塞,等待解锁
问题是加锁的是当前线程,等待解锁的也是当前线程,死锁!
3.3、多个锁导致相互阻塞
- 有两个共享资源:X, Y,X对应锁A, Y对应锁B
- 线程A访问资源X, 加锁A
- 线程B访问资源Y, 加锁B
- 线程A要访问资源Y, 线程B要访问资源X,因为资源X和Y已经被对应的锁锁住了,因此这个两个线程被阻塞
- 线程A被锁B阻塞了, 无法打开A锁
- 线程B被锁A阻塞了, 无法打开B锁
四、读写锁
读写锁是互斥锁的升级版, 在做读操作的时候可以提高程序的执行效率,如果所有的线程都是做读操作, 那么读是并行的
,但是使用互斥锁,读操作也是串行的。
读写锁是一把锁,锁的类型为pthread_rwlock_t
,有了类型之后就可以创建一把互斥锁了:
pthread_rwlock_t rwlock;
读写锁的特点:
- 使用读写锁的读锁锁定了临界区,线程对临界区的访问是并行的,
读锁是共享的。
- 使用读写锁的写锁锁定了临界区,线程对临界区的访问是串行的,
写锁是独占的。
- 使用读写锁分别对两个临界区加了读锁和写锁,两个线程要同时访问者两个临界区,访问写锁临界区的线程继续运行,访问读锁临界区的线程阻塞,因为
写锁比读锁的优先级高。
如果说程序中所有的线程都对共享资源做写操作,使用读写锁没有优势,和互斥锁是一样的,如果说程序中所有的线程都对共享资源有写也有读操作,并且对共享资源读的操作越多,读写锁更有优势。
4.1、初始化锁
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
- rwlock: 读写锁的地址,传出参数
- attr: 读写锁属性,一般使用默认属性,指定为NULL
4.2、加读锁
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
调用这个函数,如果读写锁是打开的,那么加锁成功;如果读写锁已经锁定了读操作,调用这个函数依然可以加锁成功,因为读锁是共享的;如果读写锁已经锁定了写操作,调用这个函数的线程会被阻塞。
4.3、尝试加读锁
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
调用这个函数,如果读写锁是打开的,那么加锁成功;如果读写锁已经锁定了读操作,调用这个函数依然可以加锁成功,因为读锁是共享的;如果读写锁已经锁定了写操作,调用这个函数加锁失败,对应的线程不会被阻塞,可以在程序中对函数返回值进行判断,添加加锁失败之后的处理动作。
4.4、加写锁
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
调用这个函数,如果读写锁是打开的,那么加锁成功;如果读写锁已经锁定了读操作或者锁定了写操作,调用这个函数的线程会被阻塞。
4.5、尝试加写锁
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
调用这个函数,如果读写锁是打开的,那么加锁成功;如果读写锁已经锁定了读操作或者锁定了写操作,调用这个函数加锁失败,但是线程不会阻塞,可以在程序中对函数返回值进行判断,添加加锁失败之后的处理动作。
4.6、解锁
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
不管锁定了读还是写,都可以用这个解锁
4.7、释放锁
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
释放锁占的资源
4.8、读写锁使用
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>#define NUM_READERS 3
#define NUM_WRITERS 2
#define NUM_READ_ITERATIONS 5
#define NUM_WRITE_ITERATIONS 3// 共享资源
int shared_resource = 0;// 读写锁
pthread_rwlock_t rwlock;// 读操作函数
void *reader(void *arg) {for (int i = 0; i < NUM_READ_ITERATIONS; i++) {// 加读锁pthread_rwlock_rdlock(&rwlock);// 读取共享资源printf("Reader %ld read: %d\n", (long)arg, shared_resource);// 释放读锁pthread_rwlock_unlock(&rwlock);// 模拟读操作耗时usleep(100000);}return NULL;
}// 写操作函数
void *writer(void *arg) {for (int i = 0; i < NUM_WRITE_ITERATIONS; i++) {// 加写锁pthread_rwlock_wrlock(&rwlock);// 写入共享资源shared_resource++;printf("Writer %ld wrote: %d\n", (long)arg, shared_resource);// 释放写锁pthread_rwlock_unlock(&rwlock);// 模拟写操作耗时usleep(200000);}return NULL;
}int main() {// 初始化读写锁if (pthread_rwlock_init(&rwlock, NULL) != 0) {perror("Error: Unable to initialize read-write lock");exit(EXIT_FAILURE);}pthread_t readers[NUM_READERS];pthread_t writers[NUM_WRITERS];// 创建读线程for (long i = 0; i < NUM_READERS; i++) {if (pthread_create(&readers[i], NULL, reader, (void *)i) != 0) {perror("Error: Unable to create reader thread");exit(EXIT_FAILURE);}}// 创建写线程for (long i = 0; i < NUM_WRITERS; i++) {if (pthread_create(&writers[i], NULL, writer, (void *)i) != 0) {perror("Error: Unable to create writer thread");exit(EXIT_FAILURE);}}// 等待所有读线程结束for (int i = 0; i < NUM_READERS; i++) {pthread_join(readers[i], NULL);}// 等待所有写线程结束for (int i = 0; i < NUM_WRITERS; i++) {pthread_join(writers[i], NULL);}// 销毁读写锁pthread_rwlock_destroy(&rwlock);return 0;
}
五、条件变量
严格意义上来说,条件变量的主要作用不是处理线程同步, 而是进行线程的阻塞。
如果在多线程程序中只使用条件变量无法实现线程的同步, 必须要配合互斥锁来使用。虽然条件变量和互斥锁都能阻塞线程,但是二者的效果是不一样的,二者的区别如下:
- 假设有A-Z 26个线程,这26个线程共同访问同一把互斥锁,如果线程A加锁成功,那么其余B-Z线程访问互斥锁都阻塞,所有的线程只能顺序访问临界区
- 条件变量只有在满足指定条件下才会阻塞线程,如果条件不满足,多个线程可以同时进入临界区,同时读写临界资源,这种情况下还是会出现共享资源中数据的混乱。
一般情况下条件变量用于处理生产者和消费者模型,并且和互斥锁配合使用。条件变量类型对应的类型为pthread_cond_t
,这样就可以定义一个条件变量类型的变量了:
pthread_cond_t cond;
被条件变量阻塞的线程的线程信息会被记录到这个变量中,以便在解除阻塞的时候使用。
5.1、条件变量初始化
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
- cond: 条件变量的地址
- attr: 条件变量属性, 一般使用默认属性, 指定为NULL
5.2、条件变量释放
int pthread_cond_destroy(pthread_cond_t *cond);
销毁条件变量以释放资源
5.2、线程阻塞函数
5.2.1、一直阻塞
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
通过函数原型可以看出,该函数在阻塞线程的时候,需要一个互斥锁参数,这个互斥锁主要功能是进行线程同步,让线程顺序进入临界区,避免出现数共享资源的数据混乱。该函数会对这个互斥锁做以下几件事情:
- 在阻塞线程时候,如果线程已经对互斥锁
mutex
上锁,那么会将这把锁打开,这样做是为了避免死锁 - 当线程解除阻塞的时候,函数内部会帮助这个线程再次将这个
mutex
互斥锁锁上,继续向下访问临界区
5.2.2、阻塞一段时间
// 表示的时间是从1971.1.1到某个时间点的时间, 总长度使用秒/纳秒表示
struct timespec {time_t tv_sec; /* Seconds */long tv_nsec; /* Nanoseconds [0 .. 999999999] */
};
// 将线程阻塞一定的时间长度, 时间到达之后, 线程就解除阻塞了
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);
这个函数的前两个参数和pthread_cond_wait
函数是一样的,第三个参数表示线程阻塞的时长。获得时间的方式麻烦一点
time_t mytim = time(NULL); // 1970.1.1 0:0:0 到当前的总秒数
struct timespec tmsp;
tmsp.tv_nsec = 0;
tmsp.tv_sec = time(NULL) + 100; // 线程阻塞100s
5.3、线程唤醒函数
// 唤醒阻塞在条件变量上的线程, 至少有一个被解除阻塞
int pthread_cond_signal(pthread_cond_t *cond);
// 唤醒阻塞在条件变量上的线程, 被阻塞的线程全部解除阻塞
int pthread_cond_broadcast(pthread_cond_t *cond);
调用上面两个函数中的任意一个,都可以换线被pthread_cond_wait
或者pthread_cond_timedwait
阻塞的线程,区别就在于pthread_cond_signal
是唤醒至少一个被阻塞的线程(总个数不定),pthread_cond_broadcast
是唤醒所有被阻塞的线程。
5.4、生产者消费者示例
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>#define BUFFER_SIZE 5 // 缓冲区大小int buffer[BUFFER_SIZE];
int count = 0; // 缓冲区中当前元素个数
int in = 0; // 生产者放入数据的位置
int out = 0; // 消费者取出数据的位置pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;
pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;// 生产者函数
void *producer(void *arg) {int item;while (1) {item = rand() % 100; // 生成一个随机数作为生产的数据pthread_mutex_lock(&mutex);while (count == BUFFER_SIZE) {printf("Buffer is full, producer is waiting...\n");pthread_cond_wait(¬_full, &mutex);}buffer[in] = item;printf("Produced: %d\n", item);in = (in + 1) % BUFFER_SIZE;count++;pthread_mutex_unlock(&mutex);pthread_cond_signal(¬_empty); // 通知消费者可以取数据了}return NULL;
}// 消费者函数
void *consumer(void *arg) {int item;while (1) {pthread_mutex_lock(&mutex);while (count == 0) {printf("Buffer is empty, consumer is waiting...\n");pthread_cond_wait(¬_empty, &mutex);}item = buffer[out];printf("Consumed: %d\n", item);out = (out + 1) % BUFFER_SIZE;count--;pthread_mutex_unlock(&mutex);pthread_cond_signal(¬_full); // 通知生产者可以继续生产了}return NULL;
}int main() {pthread_t producer_thread, consumer_thread;// 创建生产者和消费者线程pthread_create(&producer_thread, NULL, producer, NULL);pthread_create(&consumer_thread, NULL, consumer, NULL);// 等待线程结束pthread_join(producer_thread, NULL);pthread_join(consumer_thread, NULL);return 0;
}