操作系统:线程同步
使用Linux无名信号量实现了读写者线程的互斥和同步。
实验环境
- 环境:Linux
- 语言:C
- CMake:3.17.1
- GCC:7.5.0
- IDE:Clion 2020.3.1
实验目标
- 理解进程同步的两种制约关系:互斥与同步。
- 掌握利用记录型信号量解决进程同步问题的方法。
- 加深对进程同步控制的理解。
实验内容
以“生产者-消费者模型”为基础,在Linux环境下创建一个控制进程,在该进程中创建读者写者线程各一个,模拟生产者和消费者,它们使用N个不同的缓冲区(N为一个确定的数值,本实验中取N=16)。写者线程写入数据,然后将数据放置在空缓冲区中供读者线程读取。读者线程从缓冲区中获得数据,然后释放缓冲区。当写者线程写入数据时,如果没有空缓冲区可用,那么写者线程必须等待读者线程释放出一个空缓冲区。当读者线程读取数据时,如果没有满的缓冲区,那么读者线程将被阻塞,直到缓冲区被写者线程写满。
可以作如下的实验尝试,并观察和记录进程同步效果:
- 没有信号量时实现生产者线程与消费者线程互斥制约;
- 用一个互斥信号量
mutex
,用以阻止生产者线程和消费者线程同时操作缓冲区列表; - 用一个信号量
full
,当生产者线程生产出一个物品时可以用它向消费者线程发出信号;用一个信号量empty
,消费者线程释放出一个空缓冲区时可以用它向生产者线程发出信号。 - 同时使用2、3中的信号量,实现生产者-消费者的互斥与同步制约
实验过程和结果
函数介绍
pthread.h
#include <pthread.h>/* Create a new thread, starting with execution of START-ROUTINEgetting passed ARG. Creation attributed come from ATTR. The newhandle is stored in *NEWTHREAD. */
extern int pthread_create (pthread_t *__restrict __newthread,const pthread_attr_t *__restrict __attr,void *(*__start_routine) (void *),void *__restrict __arg) __THROWNL __nonnull ((1, 3));/* Terminate calling thread.The registered cleanup handlers are called via exception handlingso we cannot mark this function with __THROW.*/
extern void pthread_exit (void *__retval) __attribute__ ((__noreturn__));/* Make calling thread wait for termination of the thread TH. Theexit status of the thread is stored in *THREAD_RETURN, if THREAD_RETURNis not NULL.This function is a cancellation point and therefore not marked with__THROW. */
extern int pthread_join (pthread_t __th, void **__thread_return);/* Obtain the identifier of the current thread. */
extern pthread_t pthread_self (void) __THROW __attribute__ ((__const__));/* Thread identifiers. The structure of the attribute type is notexposed on purpose. */
typedef unsigned long int pthread_t;
semaphore.h
在 POSIX 标准中,信号量分两种,一种是无名信号量,一种是有名信号量。无名信号量一般用于线程间同步或互斥,而有名信号量一般用于进程间同步或互斥。它们的区别和管道及命名管道的区别类似,无名信号量则直接保存在内存中,而有名信号量要求创建一个文件。
无名信号量需要semaphore.h
头文件,有名信号量则需要sys/sem.h
头文件。本文实现线程间同步和互斥,故使用更为简便的无名信号量。
#include <semaphore.h>typedef union
{char __size[__SIZEOF_SEM_T];long int __align;
} sem_t;/* Initialize semaphore object SEM to VALUE. If PSHARED then share itwith other processes. */
extern int sem_init (sem_t *__sem, int __pshared, unsigned int __value)__THROW;/* Free resources associated with semaphore object SEM. */
extern int sem_destroy (sem_t *__sem) __THROW;This function is a cancellation point and therefore not marked with__THROW. */
extern int sem_wait (sem_t *__sem);/* Post SEM. */
extern int sem_post (sem_t *__sem) __THROWNL;
CMake
# based on clion
cmake_minimum_required(VERSION 3.17)set(CMAKE_CXX_STANDARD 14)find_package(Threads REQUIRED)
add_executable(thread main.c ${SOURCE_FILES})
target_link_libraries(thread Threads::Threads)
信号量用于互斥与同步
一、读、写者互斥
创建读者写者void *Read(void *arg)
和void *Write(void *arg)
,由于只能有一个读者、一个写者,可知读者之间是互斥的、写者之间也是互斥的,故设置信号量sem_t read_sem
和sem_t write_sem
. 设置并初始化全局变量int buffer = 0;
模拟缓冲区的写入和读出。设置并初始化int read_write_flag[2] = {1, 1};
判断是否发生读写冲突。
//
// Created by Sylvan Ding on 2022/5/10.
//#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>#define N 16
#define MAX_PROCESS_NUM 100int buffer = 0;
int read_write_flag[2] = {1, 1};// sem
sem_t read_sem;
sem_t write_sem;void *Read(void *arg) {sem_wait(&read_sem);read_write_flag[0] = 0;int buf_temp;int duration = rand() % 4;pthread_t tid = pthread_self();printf("\033[32m[R] tid: %lu starts reading...\033[0m\n", tid);if (!read_write_flag[1])printf("\033[31m[R] tid: %lu conflicts with writing process!\033[0m\n", tid);// readbuf_temp = buffer;sleep(duration);buffer = 0;printf("[R] tid: %lu has read all %d buffers!\n", tid, buf_temp);if (buf_temp < N)printf("\033[33m[R] tid: %lu did not read entire 16 buffers!\033[0m\n", tid);read_write_flag[0] = 1;sem_post(&read_sem);return NULL;
}void *Write(void *arg) {sem_wait(&write_sem);read_write_flag[1] = 0;int res;int buf_temp;int size = rand() % 16 + 1;int duration = rand() % 5;pthread_t tid = pthread_self();printf("\033[32m[W] tid: %lu starts writing...\033[0m\n", tid);// writebuf_temp = buffer;res = size + buf_temp;sleep(duration);if (res > N)buffer = 16;elsebuffer = res;printf("[W] tid: %lu has written %d buffers, with %d buffers occupied!\n", tid, res > N ? N - buf_temp : size,buffer);read_write_flag[1] = 1;sem_post(&write_sem);return NULL;
}int main(int argc, char *argv[]) {srand(888);int res = 0;int randm = 0;void *(*target)(void *) =NULL;pthread_t myThreads[MAX_PROCESS_NUM];// init semif (sem_init(&read_sem, 0, 1) < 0) {printf("failed to init read_sem");exit(0);}if (sem_init(&write_sem, 0, 1) < 0) {printf("failed to init write_sem");exit(0);}for (int i = 0; i < MAX_PROCESS_NUM; ++i) {randm = rand() % 4;if (i % 2)target = Read;elsetarget = Write;res = pthread_create(&myThreads[i], NULL, target, NULL);if (res < 0) {printf("failed to create the thread!");exit(0);}sleep(randm);}void *thread_result = NULL;for (int i = 0; i < MAX_PROCESS_NUM; ++i)res = pthread_join(myThreads[i], &thread_result);// destroy semsem_destroy(&read_sem);sem_destroy(&write_sem);return 0;
}
二、读写者互斥
为实现读、写互斥,设置mutex
信号量,编写void *Read1(void *arg)
和void *Write1(void *arg)
.
sem_t mutex;void *Read1(void *arg) {sem_wait(&mutex);Read(arg);sem_post(&mutex);return NULL;
}void *Write1(void *arg) {sem_wait(&mutex);Write(arg);sem_post(&mutex);return NULL;
}void all_sem_init() {// init semint res = 0;if (sem_init(&read_sem, 0, 1) < 0) {printf("failed to init read_sem");res = -1;}if (sem_init(&write_sem, 0, 1) < 0) {printf("failed to init write_sem");res = -1;}if (sem_init(&mutex, 0, 1) < 0) {printf("failed to init mutex");res = -1;}if (res)exit(0);
}void all_sem_des() {// destroy semsem_destroy(&read_sem);sem_destroy(&write_sem);sem_destroy(&mutex);
}int main(int argc, char *argv[]) {srand(888);int res = 0;int randm = 0;void *(*target)(void *) =NULL;pthread_t myThreads[MAX_PROCESS_NUM];all_sem_init();for (int i = 0; i < MAX_PROCESS_NUM; ++i) {randm = rand() % 4;if (i % 2)
// target = Read;target = Read1;else
// target = Write;target = Write1;res = pthread_create(&myThreads[i], NULL, target, NULL);if (res < 0) {printf("failed to create the thread!");exit(0);}sleep(randm);}void *thread_result = NULL;for (int i = 0; i < MAX_PROCESS_NUM; ++i)res = pthread_join(myThreads[i], &thread_result);all_sem_des();return 0;
}
三、读写者同步
为了解决在缓冲区未满时读进程就读出缓冲区的问题,设置full
信号量。同时,在缓冲区写满时,阻塞写进程,设置empty
信号量。二者均初始化为0.
sem_t full;
sem_t empty;void all_sem_init() {// init semint res = 0;// ...if (sem_init(&full, 0, 0) < 0) {printf("failed to init full");res = -1;}if (sem_init(&empty, 0, 0) < 0) {printf("failed to init empty");res = -1;}if (res)exit(0);
}void all_sem_des() {// destroy sem// ...sem_destroy(&full);sem_destroy(&empty);
}void *Read2(void *arg) {sem_wait(&full);Read(arg);sem_post(&empty);return NULL;
}void *Write2(void *arg) {Write(arg);if (buffer >= N) {sem_post(&full);sem_wait(&empty);}return NULL;
}int main(int argc, char *argv[]) {// ...for (int i = 0; i < MAX_PROCESS_NUM; ++i) {randm = rand() % 4;if (i % 2) {
// target = Read;
// target = Read1;target = Read2;} else {
// target = Write;
// target = Write1;target = Write2;}// ...
}
四、读写者同步且互斥
结合二、三的信号量,实现读写同步且互斥。
void *Read3(void *arg) {sem_wait(&full);sem_wait(&mutex);Read(arg);sem_post(&mutex);sem_post(&empty);return NULL;
}void *Write3(void *arg) {sem_wait(&mutex);Write(arg);sem_post(&mutex);if (buffer >= N) {sem_post(&full);sem_wait(&empty);}return NULL;
}
改进
因为只有一个读者、一个写者进程,并非多读写者,所以可以不用在main()
中开多个线程,可以只创建read()
和write()
两线程,然后在线程内部使用while
循环去访问buffer
. 这个工作就留给读者自己去实现啦~😄
附录(完整实验代码)
//
// Created by Sylvan Ding on 2022/5/10.
//#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>#define N 16
#define MAX_PROCESS_NUM 100int buffer = 0;
int read_write_flag[2] = {1, 1};// sem
sem_t read_sem;
sem_t write_sem;
sem_t mutex;
sem_t full;
sem_t empty;void *Read(void *arg) {sem_wait(&read_sem);read_write_flag[0] = 0;int buf_temp;int duration = rand() % 4;pthread_t tid = pthread_self();printf("\033[32m[R] tid: %lu starts reading...\033[0m\n", tid);if (!read_write_flag[1])printf("\033[31m[R] tid: %lu conflicts with writing process!\033[0m\n", tid);// readbuf_temp = buffer;sleep(duration);buffer = 0;printf("[R] tid: %lu has read all %d buffers!\n", tid, buf_temp);if (buf_temp < N)printf("\033[33m[R] tid: %lu did not read entire 16 buffers!\033[0m\n", tid);read_write_flag[0] = 1;sem_post(&read_sem);return NULL;
}void *Write(void *arg) {sem_wait(&write_sem);read_write_flag[1] = 0;int res;int buf_temp;int size = rand() % 16 + 1;int duration = rand() % 5;pthread_t tid = pthread_self();printf("\033[32m[W] tid: %lu starts writing...\033[0m\n", tid);// writebuf_temp = buffer;res = size + buf_temp;sleep(duration);if (res > N)buffer = 16;elsebuffer = res;printf("[W] tid: %lu has written %d buffers, with %d buffers occupied!\n", tid, res > N ? N - buf_temp : size,buffer);read_write_flag[1] = 1;sem_post(&write_sem);return NULL;
}void *Read1(void *arg) {sem_wait(&mutex);Read(arg);sem_post(&mutex);return NULL;
}void *Write1(void *arg) {sem_wait(&mutex);Write(arg);sem_post(&mutex);return NULL;
}void *Read2(void *arg) {sem_wait(&full);Read(arg);sem_post(&empty);return NULL;
}void *Write2(void *arg) {Write(arg);if (buffer >= N) {sem_post(&full);sem_wait(&empty);}return NULL;
}void *Read3(void *arg) {sem_wait(&full);sem_wait(&mutex);Read(arg);sem_post(&mutex);sem_post(&empty);return NULL;
}void *Write3(void *arg) {sem_wait(&mutex);Write(arg);sem_post(&mutex);if (buffer >= N) {sem_post(&full);sem_wait(&empty);}return NULL;
}void all_sem_init() {// init semint res = 0;if (sem_init(&read_sem, 0, 1) < 0) {printf("failed to init read_sem");res = -1;}if (sem_init(&write_sem, 0, 1) < 0) {printf("failed to init write_sem");res = -1;}if (sem_init(&mutex, 0, 1) < 0) {printf("failed to init mutex");res = -1;}if (sem_init(&full, 0, 0) < 0) {printf("failed to init full");res = -1;}if (sem_init(&empty, 0, 0) < 0) {printf("failed to init empty");res = -1;}if (res)exit(0);
}void all_sem_des() {// destroy semsem_destroy(&read_sem);sem_destroy(&write_sem);sem_destroy(&mutex);sem_destroy(&full);sem_destroy(&empty);
}int main(int argc, char *argv[]) {srand(888);int res = 0;int randm = 0;void *(*target)(void *) =NULL;pthread_t myThreads[MAX_PROCESS_NUM];all_sem_init();for (int i = 0; i < MAX_PROCESS_NUM; ++i) {randm = rand() % 4;if (i % 2) {
// target = Read;
// target = Read1;
// target = Read2;target = Read3;} else {
// target = Write;
// target = Write1;
// target = Write2;target = Write3;}res = pthread_create(&myThreads[i], NULL, target, NULL);if (res < 0) {printf("failed to create the thread!");exit(0);}sleep(randm);}void *thread_result = NULL;for (int i = 0; i < MAX_PROCESS_NUM; ++i)res = pthread_join(myThreads[i], &thread_result);all_sem_des();return 0;
}
版权声明:原创文章,转载请注明出处 ©️ Sylvan Ding
参考文献
- Linux系统编程——线程同步与互斥:POSIX无名信号量