📢博客主页:https://blog.csdn.net/2301_779549673
📢博客仓库:https://gitee.com/JohnKingW/linux_test/tree/master/lesson
📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
📢本文由 JohnKi 原创,首发于 CSDN🙉
📢未来很长,值得我们全力奔赴更美好的生活✨
文章目录
- 📢前言
- 🏳️🌈一、线程同步概念
- 🏳️🌈二、为什么需要线程同步?
- 2.1 防止数据竞争(Data Race)
- 2.2 保证操作的原子性
- 2.3 协调线程间的执行顺序
- 2.4 避免资源争用(如文件、网络连接)
- 🏳️🌈线程同步的常见手段
- 🏳️🌈三、什么是生产消费者模型
- 3.1 三种关系
- 3.2 两个角色
- 3.3 一个场景
- 🏳️🌈四、以阻塞队列模拟多生产消费者模型
- 4.1 成员名
- 4.2 构造函数和析构函数
- 4.3 模拟的生产者和消费者
- 4.4 模拟生产、消费者过程
- 🏳️🌈五、整体代码
- 5.1 BlockQueue.hpp
- 5.2 Main.cc
- 5.3 Makefile
- 👥总结
📢前言
紧接上一回的 从互斥原理到C++ RAII封装实践
笔者这回介绍一下线程中几乎与互斥一样重要的同步原理,
还有一点,笔者之后的封装都会使用之前博客中封装好的容器,需要的可以去仓库或者前面的博客中自取。
所需所用的都放在了这个仓库中
🏳️🌈一、线程同步概念
条件变量
:当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
线程同步
指在多线程编程中,通过特定机制协调多个线程的执行顺序,确保它们对共享资源(如内存、文件、硬件等)的访问安全有序。核心目标是防止并发访问导致的数据混乱、逻辑错误或资源冲突。
🏳️🌈二、为什么需要线程同步?
2.1 防止数据竞争(Data Race)
问题:多个线程同时读写共享数据时,执行顺序不确定,可能导致数据不一致。
int balance = 100; // 共享变量// 线程A执行:存入200
balance += 200; // 线程B执行:取出150
balance -= 150;
- 未同步时:若线程A和B同时读取balance=100,最终结果可能是100+200-150=150(正确应为150)或100-150+200=150,但若操作交叉执行(如A读后B写),可能得到错误值(如-50)。
2.2 保证操作的原子性
问题:单个操作(如i++)在底层可能对应多条机器指令,线程切换会导致操作未完成就被中断
; x86的i++实际步骤:
mov eax, [i] ; 读取i到寄存器
inc eax ; 寄存器加1
mov [i], eax ; 写回内存
- 若线程A执行到inc eax后被切换,线程B修改了i,线程A恢复后会将旧值写回,导致结果错误。
2.3 协调线程间的执行顺序
场景:某些任务需要线程按特定顺序执行。
生产者-消费者模型:消费者线程需等待生产者生成数据后再读取。
任务依赖:线程B必须在线程A完成初始化后才能执行。
2.4 避免资源争用(如文件、网络连接)
问题:多个线程同时写入同一文件或占用同一网络端口,会导致数据错乱或程序崩溃。
🏳️🌈线程同步的常见手段
同步问题的严重后果
数据不一致
:程序输出错误,如银行账户余额异常。
程序崩溃
:多线程同时释放内存导致双重释放(Double Free)。
死锁(Deadlock)
:线程互相等待对方释放锁,导致永久阻塞。
说白了,线程同步就是一种为了统一管理生产消费者模型的一种机制
🏳️🌈三、什么是生产消费者模型
3.1 三种关系
3.2 两个角色
生产者
,模拟是同数据的那方
消费者
,取走数据的那方
3.3 一个场景
所有生产消费所用的数据都是在中间的 “超市”
中进行
🏳️🌈四、以阻塞队列模拟多生产消费者模型
下图是以阻塞队列模拟多生产消费者模型的基本过程
也就是有两类线程(生产者和消费者),从同一个场景(blockqueue)中放入和拿出数据的过程
4.1 成员名
我们利用现有的库函数对环境进行一下封装,再利用一个队列模拟临界资源
private:std::queue<T> _q; // 保存数据的容器,临界资源int _cap; // bq最大容量pthread_mutex_t _mutex; // 互斥pthread_cond_t _productor_cond; // 生产者条件变量pthread_cond_t _consumer_cond; // 消费者条件变量int _cwait_num; // 当前等待的消费者数量int _pwait_num; // 当前等待的生产者数量};
4.2 构造函数和析构函数
BlockQueue(int cap = gcap) : _cap(cap), _cwait_num(0), _pwait_num(0){pthread_mutex_init(&_mutex, nullptr); // 创建互斥锁pthread_cond_init(&_productor_cond, nullptr); // 生产者条件变量pthread_cond_init(&_consumer_cond, nullptr); // 消费者条件变量}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_productor_cond);pthread_cond_destroy(&_consumer_cond);}
4.3 模拟的生产者和消费者
void Equeue(const T &in) // 生产者{pthread_mutex_lock(&_mutex);// 你想放数据,就能放数据吗??生产数据是有条件的!// 结论1: 在临界区中等待是必然的(目前)while (IsFull()) // 5. 对条件进行判断,为了防止伪唤醒,我们通常使用while进行判断!{std::cout << "生产者进入等待..." << std::endl;// 2. 等是,释放_mutex_pwait_num++;pthread_cond_wait(&_productor_cond, &_mutex); // wait的时候,必定是持有锁的!!是有问题的!_pwait_num--;// 3. 返回,线程被唤醒&&重新申请并持有锁(它会在临界区内醒来!)std::cout << "生产者被唤醒..." << std::endl;}// 4. if(IsFull())不满足 || 线程被唤醒_q.push(in); // 生产// 肯定有数据!if(_cwait_num){std::cout << "叫醒消费者" << std::endl;pthread_cond_signal(&_consumer_cond);}pthread_mutex_unlock(&_mutex);}void Pop(T *out) // 消费者{pthread_mutex_lock(&_mutex);while(IsEmpty()){std::cout << "消费者进入等待..." << std::endl;_cwait_num++;pthread_cond_wait(&_consumer_cond, &_mutex); // 伪唤醒_cwait_num--;std::cout << "消费者被唤醒..." << std::endl;}// 4. if(IsEmpty())不满足 || 线程被唤醒*out = _q.front();_q.pop();// 肯定有空间if(_pwait_num){std::cout << "叫醒生产者" << std::endl;pthread_cond_signal(&_productor_cond);}pthread_mutex_unlock(&_mutex);}
4.4 模拟生产、消费者过程
我们假设生产速度小于消费速度,相当于我们没生产一个对象后需要花费一定的时间,但是消费者一直就绪,就要等生产者生产出来
void *Consumer(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);while(true){int data;// 1. 从bq拿到数据bq->Pop(&data);// 2.做处理printf("Consumer, 消费了一个数据: %d\n", data);}
}void *Productor(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);int data = 10;while (true){sleep(2);// 1. 从外部获取数据// data = 10; // 有数据???// 2. 生产到bq中bq->Equeue(data);printf("producter 生产了一个数据: %d\n", data);data++;}
}
🏳️🌈五、整体代码
5.1 BlockQueue.hpp
#pragma once#include <iostream>
#include <queue>
#include <pthread.h>namespace BlockQueueModule
{static const int gcap = 10;template <typename T>class BlockQueue{private:bool IsFull() { return _q.size() == _cap; }bool IsEmpty() { return _q.empty(); }public:BlockQueue(int cap = gcap) : _cap(cap), _cwait_num(0), _pwait_num(0){pthread_mutex_init(&_mutex, nullptr); // 创建互斥锁pthread_cond_init(&_productor_cond, nullptr); // 生产者条件变量pthread_cond_init(&_consumer_cond, nullptr); // 消费者条件变量}void Equeue(const T &in) // 生产者{pthread_mutex_lock(&_mutex);// 你想放数据,就能放数据吗??生产数据是有条件的!// 结论1: 在临界区中等待是必然的(目前)while (IsFull()) // 5. 对条件进行判断,为了防止伪唤醒,我们通常使用while进行判断!{std::cout << "生产者进入等待..." << std::endl;// 2. 等是,释放_mutex_pwait_num++;pthread_cond_wait(&_productor_cond, &_mutex); // wait的时候,必定是持有锁的!!是有问题的!_pwait_num--;// 3. 返回,线程被唤醒&&重新申请并持有锁(它会在临界区内醒来!)std::cout << "生产者被唤醒..." << std::endl;}// 4. if(IsFull())不满足 || 线程被唤醒_q.push(in); // 生产// 肯定有数据!if(_cwait_num){std::cout << "叫醒消费者" << std::endl;pthread_cond_signal(&_consumer_cond);}pthread_mutex_unlock(&_mutex);}void Pop(T *out) // 消费者{pthread_mutex_lock(&_mutex);while(IsEmpty()){std::cout << "消费者进入等待..." << std::endl;_cwait_num++;pthread_cond_wait(&_consumer_cond, &_mutex); // 伪唤醒_cwait_num--;std::cout << "消费者被唤醒..." << std::endl;}// 4. if(IsEmpty())不满足 || 线程被唤醒*out = _q.front();_q.pop();// 肯定有空间if(_pwait_num){std::cout << "叫醒生产者" << std::endl;pthread_cond_signal(&_productor_cond);}pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_productor_cond);pthread_cond_destroy(&_consumer_cond);}private:std::queue<T> _q; // 保存数据的容器,临界资源int _cap; // bq最大容量pthread_mutex_t _mutex; // 互斥pthread_cond_t _productor_cond; // 生产者条件变量pthread_cond_t _consumer_cond; // 消费者条件变量int _cwait_num; // 当前等待的消费者数量int _pwait_num; // 当前等待的生产者数量};
}
5.2 Main.cc
#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>using namespace BlockQueueModule;void *Consumer(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);while(true){int data;// 1. 从bq拿到数据bq->Pop(&data);// 2.做处理printf("Consumer, 消费了一个数据: %d\n", data);}
}void *Productor(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);int data = 10;while (true){sleep(2);// 1. 从外部获取数据// data = 10; // 有数据???// 2. 生产到bq中bq->Equeue(data);printf("producter 生产了一个数据: %d\n", data);data++;}
}int main()
{// 交易场所,不仅仅可以用来进行传递数据// 传递任务!!!v1: 对象 v2BlockQueue<int> *bq = new BlockQueue<int>(5); // 共享资源 -> 临界资源// 单生产,单消费pthread_t c1, p1, c2, p2, p3;pthread_create(&c1, nullptr, Consumer, bq);pthread_create(&c2, nullptr, Consumer, bq);pthread_create(&p1, nullptr, Productor, bq);pthread_create(&p2, nullptr, Productor, bq);pthread_create(&p3, nullptr, Productor, bq);pthread_join(c1, nullptr);pthread_join(c2, nullptr);pthread_join(p1, nullptr);pthread_join(p2, nullptr);pthread_join(p3, nullptr);delete bq;return 0;
}
5.3 Makefile
bin=bq
cc=g++
src=$(wildcard *.cc)
obj=$(src:.cc=.o)$(bin):$(obj)$(cc) -o $@ $^ -lpthread
%.o:%.cc$(cc) -c $< -std=c++17.PHONY:clean
clean:rm -f $(bin) $(obj).PHONY:test
test:echo $(src)echo $(obj)
👥总结
本篇博文对 同步原理剖析及模拟多消费者模型 做了一个较为详细的介绍,不知道对你有没有帮助呢
觉得博主写得还不错的三连支持下吧!会继续努力的~