创作灵感:
格局要打开,进程也要打开,看看进程外的世界
代码描述:
1 此代码基于一个:
使用互斥锁及条件变量的线程间生产消费模型
该模型使用粗粒度锁 将整个生产或消费过程锁住
直到生产满 或 消费空 才使用条件变量通知对方
在唤醒后执行检查 只有仓库全满 或 仓库全空 才就继续执行生产消费逻辑
否则依然是通知对方并等待
该模型同时还提供了一个监控线程 用于观察仓库情况
2 变形记
将此线程间模型 分为两个进程(两个main函数)
使用posix风格的共享内存存放共享资源
使用pthread的互斥锁和条件变量对共享资源进行保护
为每个进程配置一个监视线程
并未实现2号生产者和消费者的逻辑
使用注意事项:
1 编译后在两个独立终端运行
2 不能使用大型IDE集成运行
3 unix-like系统 或 支持gnu_c的系统
4 对生产者按ctrl+c,生产消费都会清理资源并结束,输出all done
#define _GNU_SOURCE
#include <pthread.h>
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/types.h>
#include <string.h>
#include <signal.h>
#include <sys/mman.h>// 速度基数,0.1秒
int tenth_s = 100000;
// 生产者1的速度
int producer2_speed = 5;
// 生产者2的速度
int producer1_speed = 5;
// 监视刷新速度
int monitor_speed = 5;
// 定义线程执行函数指针类型
typedef void *(*func)(void *);
// 互斥锁属性类型 只被初始化1次
pthread_mutexattr_t mutexattr;
// 条件变量属性类型 只被初始化1次
pthread_condattr_t condattr;
// 能装3个线程的数组
pthread_t tids[3] = {0};
// 用于共享内存中的数据,放到一个结构体里
struct shm_data
{pthread_cond_t cv_prod;pthread_cond_t cv_cons;pthread_mutex_t mutexlock;int warehouse[10];pid_t pid;
};
// 初始化共享内存后,返回的指针,转换成struct shm_data *类型
// 复制一份到全局变量,供整个进程使用
struct shm_data *global_addr;
// 这是生产者2的执行线程
void *producer2(void *p)
{// 现在不需要他,让他睡觉printf("producer2\n");sleep(999);pthread_exit(NULL);
}
// 这是收到信号后生产者1的清理函数
void clean_up(void *p)
{// 释放锁pthread_mutex_unlock(&global_addr->mutexlock);
}
// 这是生产者1的执行线程
void *producer1(void *p)
{printf("producer1\n");// 无限循环 套在最外面while (1){// 压入清理函数 防止意外终止pthread_cleanup_push(clean_up, NULL);// 上锁,注意是共享内存中的锁pthread_mutex_lock(&global_addr->mutexlock);// 用于记录仓库中产品的数量int count_p = 0;for (size_t i = 0; i < 10; i++){// 如果在某个位置上没货if (global_addr->warehouse[i] == 0){// 生产消耗时间usleep(producer1_speed * tenth_s);// 生产完毕global_addr->warehouse[i] += 1;}// 在某个位置有货记录+1if (global_addr->warehouse[i] == 1){count_p += 1;}}// 如果10次循环 每个位置都有货,表示仓库已满if (count_p == 10){// do...while用于检查唤醒后仓库的状态// 如果没有任何产品则继续生产,如果存在产品未消耗则通知消费者并等待do{// 告诉消费者可以消费了pthread_cond_signal(&global_addr->cv_cons);printf("producer %d wait\n", gettid());// 放锁,睡眠pthread_cond_wait(&global_addr->cv_prod, &global_addr->mutexlock);// 被唤醒后检查仓库状态count_p = 0;for (size_t i = 0; i < 10; i++){if (global_addr->warehouse[i] == 0){count_p += 1;}}} while (count_p != 10);// 真正醒来printf("producer %d wake up\n", gettid());}// 放锁,新一轮生产pthread_mutex_unlock(&global_addr->mutexlock);pthread_cleanup_pop(1);}pthread_exit(NULL);
}
// 这是一个人类可读的监视进程
void *show_warehouse(void *p)
{while (1){for (size_t i = 0; i < 10; i++){printf("[%d] ", global_addr->warehouse[i]);}printf("\n");// 刷新速度usleep(monitor_speed * tenth_s);}
}
// ctrl+c 信号处理函数
void sig_handler(int signum)
{// 打印信号描述printf("%s\n", strsignal(signum));
}
// 此函数用于初始化共享内存
void shm_init()
{// 打开共享内存文件描述符int fd = shm_open("/shm1", O_CREAT | O_RDWR, 0700);if (fd == -1){perror("shm_open");}// 扩容if (ftruncate(fd, 1024) == -1){perror("ftruncate");}// 附加到进程,并转换成struct shm_data *类型struct shm_data *addr = (struct shm_data *)mmap(NULL, 1024, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);if (addr == MAP_FAILED){perror("mmap");}else{// 赋值全局变量,全局可用global_addr = addr;}
}
void ips_init()
{// 初始化mutexattrpthread_mutexattr_init(&mutexattr);// 设共享pthread_mutexattr_setpshared(&mutexattr, PTHREAD_PROCESS_SHARED);// 初始化mutexpthread_mutex_init(&global_addr->mutexlock, &mutexattr);// 初始化condattrpthread_condattr_init(&condattr);// 设共享pthread_condattr_setpshared(&condattr, PTHREAD_PROCESS_SHARED);// 初始化condpthread_cond_init(&global_addr->cv_cons, &condattr);pthread_cond_init(&global_addr->cv_prod, &condattr);// 初始化仓库memset(global_addr->warehouse, 0, sizeof(global_addr->warehouse));// 创建线程两个生产者,一个监视func fun_c[3] = {producer1, producer2, show_warehouse};int i = 0;for (i = 0; i < 3; i++){pthread_create(&tids[i], NULL, fun_c[i], NULL);}
}
int main()
{// 注册信号处理函数 sig_handlersignal(SIGINT, sig_handler);// 初始化共享内存,赋值给一个struct shm_data 类型的全局指针变量shm_init();// 一些把线程共享改成进程共享的准备工作ips_init();// 暂停主线程pause();// 关闭消费者进程kill(global_addr->pid, SIGINT);// 关闭生产线程int i;for (i = 0; i < 3; i++){pthread_cancel(tids[i]);pthread_join(tids[i], NULL);}// 销毁条件变量属性pthread_condattr_destroy(&condattr);// 销毁互斥锁属性pthread_mutexattr_destroy(&mutexattr);// 销毁互斥锁pthread_mutex_destroy(&global_addr->mutexlock);// 销毁条件变量pthread_cond_destroy(&global_addr->cv_prod);pthread_cond_destroy(&global_addr->cv_cons);// 与共享内存分离munmap(global_addr, 1024);// 删除共享内存shm_unlink("/shm1");// 全剧终printf("all done\n");return 0;
}
#define _GNU_SOURCE
#include <pthread.h>
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/types.h>
#include <string.h>
#include <signal.h>
#include <sys/mman.h>
// 调速
int tenth_s = 100000;
int consumer_speed = 5;
int producer_speed = 5;
int monitor_speed = 5;pthread_t tids[3] = {0};
typedef void *(*func)(void *);
struct shm_data
{pthread_cond_t cv_prod;pthread_cond_t cv_cons;pthread_mutex_t mutexlock;int warehouse[10];pid_t pid;
};
struct shm_data *global_addr;
// 清理函数
void clean_up(void *p)
{pthread_mutex_unlock(&global_addr->mutexlock);
}
// consumer1线程执行函数
void *consumer1(void *p)
{printf("consumer1\n");while (1){pthread_cleanup_push(clean_up, NULL);pthread_mutex_lock(&global_addr->mutexlock);int count_c = 0;for (size_t i = 0; i < 10; i++){if (global_addr->warehouse[i] == 1){usleep(consumer_speed * tenth_s);global_addr->warehouse[i] -= 1;}if (global_addr->warehouse[i] == 0){count_c += 1;}}if (count_c == 10){do{pthread_cond_signal(&global_addr->cv_prod);printf("consumer %d wait\n", gettid());pthread_cond_wait(&global_addr->cv_cons, &global_addr->mutexlock);// 被唤醒后检查仓库状态count_c = 0;for (size_t i = 0; i < 10; i++){if (global_addr->warehouse[i] == 1){count_c += 1;}}// 如果仓库中所有位置都有商品,则通过检查,继续消费} while (count_c != 10);printf("consumer %d wake up\n", gettid());}pthread_mutex_unlock(&global_addr->mutexlock);pthread_cleanup_pop(1);}pthread_exit(NULL);
}
// consumer1线程执行函数
void *consumer2(void *p)
{printf("consumer2\n");sleep(999);pthread_exit(NULL);
}
// 这是一个人类可读的监视进程
void *show_warehouse(void *p)
{while (1){for (size_t i = 0; i < 10; i++){printf("[%d] ", global_addr->warehouse[i]);}printf("\n");usleep(monitor_speed * tenth_s);}pthread_exit(NULL);
}
// 共享内存初始化函数
void shm_init()
{int fd = shm_open("/shm1", O_CREAT | O_RDWR, 0700);if (fd == -1){perror("shm_open");}struct shm_data *addr = (struct shm_data *)mmap(NULL, 1024, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);if (addr == MAP_FAILED){perror("mmap");}else{// 注意这里赋值了共享资源pid,用于在生产进程按ctrl+c时 同时关闭清理消费进程addr->pid = getpid();global_addr = addr;}
}
void pthread_create_init()
{func fun_c[3] = {consumer1, consumer2, show_warehouse};int i = 0;for (i = 0; i < 3; i++){pthread_create(&tids[i], NULL, fun_c[i], NULL);}
}
// 信号处理函数
void sig_handler(int signum)
{printf("%s\n", strsignal(signum));
}
int main()
{// 注册信号处理函数signal(SIGINT, sig_handler);// 初始化共享内存shm_init();// 创建进程pthread_create_init();// 主线程暂停pause();// 收到信号后运行下面代码int i;for (i = 0; i < 3; i++){pthread_cancel(tids[i]);pthread_join(tids[i], NULL);}// 与共享内存分离munmap(global_addr, 1024);printf("all done\n");return 0;
}