【Linux】线程的互斥和同步

【Linux】线程的互斥和同步

线程间的互斥

  • 临界资源:多线程执行共享的资源就叫做临界资源
  • 临界区:每个线程内部,访问临界资源的代码,就叫做临界区
  • 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源进行保护
  • 原子性:不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成。

1. 互斥量mutex

  • 大部分情况下,线程使用的数据都是局部变量,变量的地址空间在线程的栈空间内,这种情况下,变量属于单个线程,且其他线程无法访问该变量
  • 但有时候,很多变量需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程间的交互
  • 多个线程并发操作共享变量,会带来一些问题

实验:操作共享变量会有问题的售票系统代码

#ifndef __THREAD_HPP__
#define __THREAD_HPP__
#include<iostream>
#include<pthread.h>
#include <string>
#include <unistd.h>
#include<functional>
namespace ThreadModule
{template<typename T>using func_t = std::function<void(T)>;template<typename T>class Thread{public:Thread(func_t<T> func,T const data,const std::string& threadname = "none_name")//--常量引用和非常量引用的概念:_func(func),_data(data),_threadname(threadname),_stop(true){}void Excute(){_func(_data);}static void* handler(void* args){Thread<T>* self = static_cast<Thread<T> *>(args);// self->Excute();self->_func(self->_data);return nullptr;}bool Start(){int ret = pthread_create(&_tid,nullptr,handler,this);if(ret == 0){_stop = false;return true;}else {return false;}}void Join(){if(!_stop){pthread_join(_tid,nullptr);}}std::string name(){return _threadname;}~Thread(){}private:std::string _threadname;pthread_t _tid;T _data;func_t<T> _func;bool _stop;};
}
#endif
using namespace ThreadModule;
int g_tickets = 10000; // 共享资源,没有保护的, 临界资源
class ThreadData
{public:ThreadData(const std::string& threadname,int& ticked):_threadname(threadname),_tickeds(ticked),_total(0){}~ThreadData(){}public:std::string _threadname;int& _tickeds;int _total;
};
void route(ThreadData* ptr)
{while(true){if(ptr->_tickeds > 0){//模拟一次抢票逻辑usleep(1000);printf("%s is runing ,get ticked: %d\n",ptr->_threadname.c_str(),ptr->_tickeds);ptr->_tickeds--;ptr->_total++;}else{break;}}return ;
}
const int num = 4;
int main()
{//创建一批线程std::vector<Thread<ThreadData*>> threads;std::vector<ThreadData*> datas;for(int i = 0;i < num;i++){std::string name = "thread-" + std::to_string(i+1);ThreadData* _ptr = new ThreadData(name,g_tickets);threads.emplace_back(route,_ptr,name);datas.emplace_back(_ptr);}//启动一批线程for(auto& thread:threads){thread.Start();}//等待一批线程for(auto& thread:threads){thread.Join();std::cout<<"wait thread name: "<<thread.name()<<std::endl;}for(auto data: datas){std::cout<<"name: "<<data->_threadname<<"total is: "<<data->_total<<std::endl; delete data; }return 0;
}

实验现象:

为什么会抢到票为-1,或-2的票呢?

  • if 语句判断条件为真以后,代码可以并发的切换到其他线程
  • usleep 这个模拟漫长业务的过程,在这个漫长的业务过程中,可能有很多个线程会进入该代码段
  • _tickeds-- 操作本身就不是一个原子操作

在这里插入图片描述

取出ticket--部分的汇编代码
objdump -d a.out > test.objdump
152 40064b: 8b 05 e3 04 20 00 mov 0x2004e3(%rip),%eax # 600b34 <_tickeds>
153 400651: 83 e8 01 sub $0x1,%eax
154 400654: 89 05 da 04 20 00 mov %eax,0x2004da(%rip) # 600b34 <_tickeds>

– 操作并不是原子操作,而是对应三条汇编指令:

  • load :将共享变量ticket从内存加载到寄存器中
  • update : 更新寄存器里面的值,执行-1操作
  • store :将新值,从寄存器写回共享变量ticket的内存地址

要解决以上问题,需要做到三点:

  • 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
  • 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临
    界区。
  • 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区

要做到这三点,本质上就是需要一把锁。Linux上提供的这把锁叫互斥量。

2. 互斥量的接口

初始化互斥量
初始化互斥量有两种方法:

  • 方法1,静态分配:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
  • 方法2,动态分配:
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrictattr);参数:mutex:要初始化的互斥量attr:NULL

销毁互斥量
销毁互斥量需要注意:

  • 使用PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁
  • 不要销毁一个已经加锁的互斥量
  • 已经销毁的互斥量,要确保后面不会有线程再尝试加锁
int pthread_mutex_destroy(pthread_mutex_t *mutex);

互斥量加锁和解锁

int ptread_mutex_lock(pthread_mutex* mutex);
int ptread_mutex_unlock(pthread_mutex* mutex);
返回值:成功返回0,失败返回错误号

调用pthread_lock可能会出现以下情况:

  • 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
  • 发起函数时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但有竞争到互斥量,那么pthread_lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁

改进上面的售票系统:

int g_tickets = 10000; // 共享资源,没有保护的, 临界资源
void route(ThreadData* ptr)
{while(true){pthread_mutex_lock(&getmutex);//加锁:本质是将并行执行--》串行执行,加锁的粒度越细越好if(ptr->_tickeds > 0){usleep(1000);printf("%s is runing ,get ticked: %d\n",ptr->_threadname.c_str(),ptr->_tickeds);ptr->_tickeds--;pthread_mutex_unlock(&getmutex);//解锁ptr->_total++;}else{pthread_mutex_unlock(&getmutex);//解锁break;}}return ;
}

image-20241123142442036

更优雅的代码:

#ifndef __LOCK_GUARD_HPP__
#define __LOCK_GUARD_HPP__
#include<pthread.h>
class LockGuard
{public:LockGuard(pthread_mutex_t* mutex):_mutex(mutex){pthread_mutex_lock(_mutex);}~LockGuard(){pthread_mutex_unlock(_mutex);}private:pthread_mutex_t* _mutex;
};
#endif
#include"Lock_Guard.hpp"
#include"Thread.hpp"
#include<iostream>
#include <iostream>
#include <vector>
#include<mutex>using namespace ThreadModule;
//静态锁
//pthread_mutex_t getmutex = PTHREAD_MUTEX_INITIALIZER;//静态锁
class ThreadData
{public:ThreadData(const std::string& threadname,int& ticked,std::mutex& mutex):_threadname(threadname),_tickeds(ticked),_total(0),_mutex(mutex){}~ThreadData(){}public:std::string _threadname;int& _tickeds;int _total;// pthread_mutex_t& _mutex;std::mutex& _mutex;
};
int g_tickets = 10000; // 共享资源,没有保护的, 临界资源
void route(ThreadData* ptr)
{while(true){//加锁:本质是将并行执行--》串行执行,加锁的粒度越细越好//线程竞争锁是自由竞争的,竞争锁的能力太强,就会导致其他线程抢不到锁,---造成其他线程的饥饿问题!!!//pthread_mutex_lock(&ptr->_mutex);     动态锁//pthread_mutex_lock(&getmutex);        静态锁//LockGuard mutex(&ptr->_mutex);        自己封装的RAII锁// std::lock_guard<std::mutex> lock(ptr->_mutex);  //C++11RAII锁ptr->_mutex.lock();                     //C++11锁//模拟一次抢票逻辑              if(ptr->_tickeds > 0)         {usleep(1000);printf("%s is runing ,get ticked: %d\n",ptr->_threadname.c_str(),ptr->_tickeds);ptr->_tickeds--;//pthread_mutex_unlock(&ptr->_mutex);//解锁//pthread_mutex_unlock(&getmutex);ptr->_mutex.unlock();ptr->_total++;}else{//pthread_mutex_unlock(&ptr->_mutex);//解锁//pthread_mutex_unlock(&getmutex);ptr->_mutex.unlock();break;}}return ;
}
const int num = 4;
int main()
{//动态锁// pthread_mutex_t mutex;// pthread_mutex_init(&mutex,nullptr);//C++11锁std::mutex mutex;//创建一批线程std::vector<Thread<ThreadData*>> threads;std::vector<ThreadData*> datas;for(int i = 0;i < num;i++){std::string name = "thread-" + std::to_string(i+1);ThreadData* _ptr = new ThreadData(name,g_tickets,mutex);threads.emplace_back(route,_ptr,name);datas.emplace_back(_ptr);}//启动一批线程for(auto& thread:threads){thread.Start();}//等待一批线程for(auto& thread:threads){thread.Join();std::cout<<"wait thread name: "<<thread.name()<<std::endl;}for(auto data: datas){std::cout<<"name: "<<data->_threadname<<"total is: "<<data->_total<<std::endl; delete data; //pthread_mutex_destroy(&data->_mutex);data->_mutex.~mutex();}return 0;
}

2. 互斥的底层实现

  • 经过上面的例子,大家已经意识到单纯的i++ 或者++i 都不是原子的,有可能会有数据一致性问题
  • 为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器内存单
    数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的 总线周期也有先后,一
    个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。 现在我们把lock和unlock的伪代码改一下

加锁and解锁:

image-20241123164826838

image-20241123170200857

线程间的同步

条件变量

  • 当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
  • 例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情
    况就需要用到条件变量。

同步概念与竞态条件

  • 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问
    题,叫做同步
  • 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解

1. 条件变量函数接口

初始化

  • 静态初始化
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
  • 动态初始化
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict
attr);
参数:cond:要初始化的条件变量attr:NULL
  • 销毁
int pthread_con_destroy(pthread_con_t *cond);
  • 等待条件满足
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
参数:cond:要在这个条件变量上等待mutex:互斥量,后面详细解释
  • 唤醒等待
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);

实验:线程同步

#include<iostream>
#include<vector>
#include<string>
#include<unistd.h>
#include<pthread.h>
pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;//互斥量
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;//条件变量
void* Mastercore(void* args)
{sleep(3);std::cout<<"mastcore 开始唤醒..."<<std::endl;std::string name = static_cast<char*>(args);//唤醒...while(true){pthread_cond_signal(&cond);//唤醒等待队列中第一个线程sleep(1);}//pthread_cond_broadcast(&cond);//唤醒等待队列中所有线程return nullptr;
}
void* Slavercore(void* args)
{std::string name = static_cast<char*>(args);while(true){pthread_mutex_lock(&gmutex);//加锁pthread_cond_wait(&cond,&gmutex);// 等待条件变量,std::cout<<name<<"被唤醒..."<<std::endl;//TODOpthread_mutex_unlock(&gmutex);//解锁}
}
void StartMaster(std::vector<pthread_t>* tidptr)
{pthread_t tid;int n = pthread_create(&tid,nullptr,Mastercore,(void*)"master thread");if(n == 0){std::cout<<"master thread create success ..."<<std::endl;}tidptr->emplace_back(tid);
}void StartSlaver(std::vector<pthread_t>* tidptr,int threadnum)
{for(int i = 0;i< threadnum;i++){pthread_t tid;char* name = new char[64];snprintf(name,64,"thread-%d",i+1);int n = pthread_create(&tid,nullptr,Slavercore,name);if(n == 0){std::cout<<name<<" create success ..."<<std::endl;}tidptr->emplace_back(tid);}
}
void WaitThread(const std::vector<pthread_t>& tids)
{for(auto tid:tids){pthread_join(tid,nullptr);}
}
int main()
{std::vector<pthread_t> tids;StartMaster(&tids);StartSlaver(&tids,5);WaitThread(tids);return 0;
}

结果:

image-20241123205256945

生产者与消费者模型

  • 解耦
  • 支持并发
  • 支持忙闲不均:是指在一个系统中,不同组件或线程之间工作负载分配不均匀的现象。

image-20241124142954354

实验:生产者与消费者模型基础版

阻塞队列:

#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__
#include <iostream>
#include <pthread.h>
#include <queue>
template <typename T>
class BlockQueue
{
public:bool is_full(){return _BlockQueue.size() == _cap;}bool is_empty(){return _BlockQueue.empty();}public:BlockQueue(int cap): _cap(cap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_productor_cond, nullptr);pthread_cond_init(&_consumer_cond, nullptr);}void enqueue(T &in) // 生产者使用接口{pthread_mutex_lock(&_mutex);if (is_full()){pthread_cond_wait(&_productor_cond, &_mutex); // 如果满了,生产者入等待队列,解锁--唤醒--出等待队列,锁定}_BlockQueue.push(in);// 通知消费者来买// std::cout << "通知消费者来买" << std::endl;pthread_cond_signal(&_consumer_cond);// std::cout << "通知完成" << std::endl;pthread_mutex_unlock(&_mutex);}void pop(T *out){pthread_mutex_lock(&_mutex);if (is_empty()){// std::cout << "消费者入等待队列" << std::endl;pthread_cond_wait(&_consumer_cond, &_mutex); // 如果空了,消费者入等待队列,解锁,---被唤醒--出等待队列,锁定}*out = _BlockQueue.front();_BlockQueue.pop();// 通知生产者来卖pthread_cond_signal(&_productor_cond);pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_productor_cond);pthread_cond_destroy(&_consumer_cond);}private:std::queue<T> _BlockQueue;int _cap; // 阻塞队列上限pthread_mutex_t _mutex;pthread_cond_t _productor_cond; // 生产者等待队列pthread_cond_t _consumer_cond;  // 消费者等待队列
};
#endif

主函数:

#include <iostream>
#include "BlockQueue.hpp"
#include "Thread.hpp"
using namespace ThreadModule;void producer(BlockQueue<int> &bq)
{int cnt = 3;while (true){bq.enqueue(cnt);std::cout << "producer is sell :" << cnt << std::endl;cnt++;}
}
void consumer(BlockQueue<int> &bq)
{while (true){sleep(3);int data;bq.pop(&data); // 为什么传地址,通过地址修改cntstd::cout << "consumer is buy :" << data << std::endl;}
}
void Start_Com(std::vector<Thread<BlockQueue<int>>> *threads_ptr, BlockQueue<int> &bq, int num, func_t<BlockQueue<int>> func)
{for (int i = 0; i < num; i++){std::string name = "thread-" + std::to_string(i + 1);threads_ptr->emplace_back(func, bq, name);threads_ptr->back().Start();}
}
void StartProducer(std::vector<Thread<BlockQueue<int>>> *threads_ptr, BlockQueue<int> &bq, int num)
{Start_Com(threads_ptr, bq, num, producer);
}
void StartConsumer(std::vector<Thread<BlockQueue<int>>> *threads_ptr, BlockQueue<int> &bq, int num)
{Start_Com(threads_ptr, bq, num, consumer);
}
void WaitAllThread(std::vector<Thread<BlockQueue<int>>> &threads)
{for (auto thread : threads){thread.Join();}
}
int main()
{BlockQueue<int> *ptr = new BlockQueue<int>(5);std::vector<Thread<BlockQueue<int>>> threads;StartProducer(&threads, *ptr, 1);StartConsumer(&threads, *ptr, 1);WaitAllThread(threads);return 0;
}

image-20241124143415817

升级版:传递任务1. 0

#pragma once
#include <iostream>
class Task
{
public:Task(){}Task(int a, int b): _a(a), _b(b){}void Excute(){_result = _a+_b;}std::string ResultToString(){return std::to_string(_a)+" + "+std::to_string(_b)+" = "+std::to_string(_result);}std::string DebugToString(){return std::to_string(_a)+" + "+std::to_string(_b)+" = ?";}
private: int _a;int _b;int _result;
};
#include <iostream>
#include "BlockQueue.hpp"
#include "Thread.hpp"
#include "Task.hpp"
using namespace ThreadModule;void producer(BlockQueue<Task> &bq)
{srand((unsigned)time(NULL));int cnt = 3;while (true){sleep(2);int first = rand() % 100;usleep(1234);int second = rand() % 100;Task tk(first, second);bq.enqueue(tk);std::cout << tk.DebugToString() << std::endl;}
}
void consumer(BlockQueue<Task> &bq)
{while (true){Task td;bq.pop(&td); // 为什么传地址,通过地址修改cnttd.Excute();std::cout << td.ResultToString() << std::endl;}
}
void Start_Com(std::vector<Thread<BlockQueue<Task>>> *threads_ptr, BlockQueue<Task> &bq, int num, func_t<BlockQueue<Task>> func)
{for (int i = 0; i < num; i++){std::string name = "thread-" + std::to_string(i + 1);threads_ptr->emplace_back(func, bq, name);threads_ptr->back().Start();}
}
void StartProducer(std::vector<Thread<BlockQueue<Task>>> *threads_ptr, BlockQueue<Task> &bq, int num)
{Start_Com(threads_ptr, bq, num, producer);
}
void StartConsumer(std::vector<Thread<BlockQueue<Task>>> *threads_ptr, BlockQueue<Task> &bq, int num)
{Start_Com(threads_ptr, bq, num, consumer);
}
void WaitAllThread(std::vector<Thread<BlockQueue<Task>>> &threads)
{for (auto thread : threads){thread.Join();}
}
int main()
{BlockQueue<Task> *ptr = new BlockQueue<Task>(5);std::vector<Thread<BlockQueue<Task>>> threads;StartProducer(&threads, *ptr, 1);StartConsumer(&threads, *ptr, 3);WaitAllThread(threads);return 0;
}

image-20241124154042539

传递任务2.0

#include <iostream>
#include "BlockQueue.hpp"
#include "Thread.hpp"
#include "Task.hpp"
using namespace ThreadModule;
using Task = std::function<void()>;
using bock_queue_t = BlockQueue<Task>;
void printdata()
{std::cout << "hell word" << std::endl;
}
void producer(bock_queue_t &bq)
{while (true){sleep(1);Task t = printdata;bq.enqueue(t);}
}
void consumer(bock_queue_t &bq)
{while (true){Task tk;bq.pop(&tk); // 为什么传地址,通过地址修改cnttk();}
}
void Start_Com(std::vector<Thread<BlockQueue<Task>>> *threads_ptr, bock_queue_t &bq, int num, func_t<bock_queue_t> func)
{for (int i = 0; i < num; i++){std::string name = "thread-" + std::to_string(i + 1);threads_ptr->emplace_back(func, bq, name);threads_ptr->back().Start();//创建线程}
}
void StartProducer(std::vector<Thread<bock_queue_t>> *threads_ptr, bock_queue_t &bq, int num)
{Start_Com(threads_ptr, bq, num, producer);
}
void StartConsumer(std::vector<Thread<bock_queue_t>> *threads_ptr, bock_queue_t &bq, int num)
{Start_Com(threads_ptr, bq, num, consumer);
}
void WaitAllThread(std::vector<Thread<bock_queue_t>> &threads)
{for (auto thread : threads){thread.Join();}
}
int main()
{bock_queue_t *ptr = new bock_queue_t(5);std::vector<Thread<bock_queue_t>> threads;StartProducer(&threads, *ptr, 1);StartConsumer(&threads, *ptr, 3);WaitAllThread(threads);return 0;
}

image-20241124154529305

POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

  • 初始化信号量
#include<semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value)
参数:pshared: 0表示线程间共享,非零表示进程间共享value:表示信号量初始值
  • 销毁信号量
int sem_destroy(sem_t* sem);
  • 等待信号量
功能:等待信号量,会将信号量值减一
int sem_wait(sem_t * sem);//P()
  • 发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了,将信号量的值叫加一
int sem_post(sem_t * sem);//V()

上一节生产者-消费者的例子是基于queue的,其空间可以动态分配,现在基于固定大小的环形队列重写这个程序
(POSIX信号量):

image-20241124200631234

基于环形队列的生产消费模型

  • 环形队列采用数组模拟,用模运算来模拟环状特性

image-20241124192711664

  • 环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来
    判断满或者空。另外也可以预留一个空的位置,作为满的状态

image-20241124192731836

  • 但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程
#ifndef __RING_QUEUE_HPP__
#define __RING_QUEUE_HPP__
#include <iostream>
#include <pthread.h>
#include <queue>
#include <semaphore.h>
#include <vector>
template <typename T>
class RingQueue
{
public:void P(sem_t &sem){sem_wait(&sem);}void V(sem_t &sem){sem_post(&sem);}void Lock(pthread_mutex_t &mutex){pthread_mutex_lock(&mutex);}void Unlock(pthread_mutex_t &mutex){pthread_mutex_unlock(&mutex);}public:RingQueue(int cap): _ring_queue(cap), _cap(cap), _productor_index(0), _consumer_index(0){sem_init(&_room_sem, 0, _cap);//信号量初始化,空间资源初始为_ring_queue的容量sem_init(&_data_sem, 0, 0);//数据资源初始为 0pthread_mutex_init(&_productor_mutex, nullptr);//互斥量初始化pthread_mutex_init(&_consumer_mutex, nullptr);}void emquue(T &in){// 生产行为//P操作用来减少信号量的值(通常是减1)。//如果_room_sem信号量的值大于0,执行P操作后,信号量的值减1,进程继续执行。//如果信号量的值为0,执行P操作后,进程会被阻塞,直到信号量的值变为大于0,这时进程才会被唤醒并继续执行。P(_room_sem);Lock(_productor_mutex);//加锁,维护生产者与生产者的竞争_ring_queue[_productor_index++] = in;//生产数据_productor_index %= _ring_queue.size();Unlock(_productor_mutex);V(_data_sem);//当_data_sem信号量的值增加后,如果有进程因为执行P(_data_sem)操作而被阻塞在该信号量上,//那么系统会选择一个或多个进程解除其阻塞状态,允许它们继续执行}void pop(T *out){// 消费行为P(_data_sem);//_dataLock(_consumer_mutex);*out = _ring_queue[_consumer_index++];_consumer_index %= _ring_queue.size();Unlock(_consumer_mutex);V(_room_sem);}~RingQueue(){sem_destroy(&_room_sem);//销毁信号量sem_destroy(&_data_sem);pthread_mutex_destroy(&_productor_mutex);//销毁互斥量pthread_mutex_destroy(&_consumer_mutex);}private:// 环形队列std::vector<T> _ring_queue;int _cap;// 生产者与消费者下标int _productor_index;int _consumer_index;// 信号量sem_t _room_sem;//空间信号量sem_t _data_sem;//数据信号量// 互斥量pthread_mutex_t _productor_mutex;//生产者互斥量pthread_mutex_t _consumer_mutex;//消费者互斥量
};
#endif
#ifndef __THREAD_HPP__
#define __THREAD_HPP__
#include <iostream>
#include <pthread.h>
#include <string>
#include <unistd.h>
#include <functional>
namespace ThreadModule
{template <typename T>using func_t = std::function<void(T &,std::string)>;template <typename T>class Thread{public:Thread(func_t<T> func, T& data, const std::string threadname = "none_name") // 为什么--常量引用和非常量引用的概念: _func(func), _data(data), _threadname(threadname), _stop(true){}void Excute(){_func(_data,_threadname);}static void *handler(void *args){Thread<T> *self = static_cast<Thread<T> *>(args);self->Excute();return nullptr;}bool Start(){int ret = pthread_create(&_tid, nullptr, handler, this);if (ret == 0){_stop = false;return true;}else{return false;}}void Join(){if (!_stop){pthread_join(_tid, nullptr);}}std::string name(){return _threadname;}~Thread() {}private:std::string _threadname;pthread_t _tid;T& _data;func_t<T> _func;bool _stop;};
}
#endif
#include <iostream>
#include "RingQueue.hpp"
#include "Thread.hpp"
using namespace ThreadModule;
using Task = std::function<void()>;
using ring_queue_t = RingQueue<Task>;
void printdata()
{std::cout << "hell word" << std::endl;
}
void producer(ring_queue_t &bq, std::string name)
{int cnt = 10;while (true){sleep(2);Task t = printdata;bq.emquue(t); // 传递任务// std::cout<< name << " in: " << cnt << std::endl;// cnt++;}
}
void consumer(ring_queue_t &bq, std::string name)
{while (true){int cnt;Task tk;bq.pop(&tk);tk(); // 执行执行任务std::cout << name << " is run : task " << std::endl;}
}
void Init_Com(std::vector<Thread<ring_queue_t>> *threads_ptr, ring_queue_t &rq, int num, func_t<ring_queue_t> func, const std::string &who)
{for (int i = 0; i < num; i++){std::string _name = "thread- " + std::to_string(i + 1) + "  " + who;threads_ptr->emplace_back(func, rq, _name);// threads_ptr->back().Start();}
}
void InitProducer(std::vector<Thread<ring_queue_t>> *threads_ptr, ring_queue_t &rq, int num)
{Init_Com(threads_ptr, rq, num, producer, "producer");
}
void InitConsumer(std::vector<Thread<ring_queue_t>> *threads_ptr, ring_queue_t &rq, int num)
{Init_Com(threads_ptr, rq, num, consumer, "consumer");
}
void StartAllThread(std::vector<Thread<ring_queue_t>> &threads)
{for (auto &thread : threads){thread.Start();}
}
void WaitAllThread(std::vector<Thread<ring_queue_t>> &threads)
{for (auto &thread : threads){thread.Join();}
}
int main()
{ring_queue_t *ptr = new ring_queue_t(10);std::vector<Thread<ring_queue_t>> threads; // 所有副线程共享ring_queueInitProducer(&threads, *ptr, 1);           // 生产者初始化InitConsumer(&threads, *ptr, 3);           // 消费者初始化StartAllThread(threads);                   // 启动所有副线程WaitAllThread(threads);                    // 等待所有副线程return 0;
}

结果

[!NOTE]

代码一定要多敲,才能明白里面的细节,加油👍👍👍

【Linux】互斥和同步—完结。下一章【Linux】线程池

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/62308.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

抓包之查看http basic auth认证方式

写在前面 在这篇文章中我们看了http basic auth的认证方式&#xff0c;本文通过wireshark抓包的方式来验证http协议的交互过程。 1&#xff1a;正文 首先wireshark抓取本机回环地址&#xff08;具体看你服务情况&#xff0c;决定哪个网卡&#xff0c;我本地是运行在127的&am…

网络安全(1)_对称加密和非对称加密

1 网络安全概述 1.1 计算机网络面临的安全威协 &#xff08;1&#xff09;截获&#xff1a;攻击者从网络上窃听他人的通信内容&#xff0c;通常把这类攻击称为“截获”。在被动攻击中&#xff0c;攻击者只是观察和分析某一个协议数据单元&#xff08;PDU&#xff09;而不干扰信…

vmware中所有虚拟机都ping不通时解决方案

文章目录 1、报错内容&#xff1a;2、 报错原因&#xff1a;3、解决方案&#xff1a; 1、报错内容&#xff1a; 2、 报错原因&#xff1a; DNS解析错误。 一个虚拟机ping不通可能是网关配置问题&#xff0c;但我的网关配置也都没问题&#xff0c;而且我的所有虚拟机都ping不通…

智慧防汛平台在城市生命线安全建设中的应用

随着城市化进程的加快&#xff0c;城市基础设施的复杂性和互联性不断增强&#xff0c;城市生命线的安全管理面临前所未有的挑战。智慧防汛平台作为城市生命线安全建设的重要组成部分&#xff0c;通过现代信息技术提升城市防汛应急管理的智能化水平&#xff0c;保障城市安全。 …

自己整理的的java面试题

IO 按照流的流向分类&#xff1a;输入流和输出流 按照操作单元分类&#xff1a;可以分为字节流和字符流 按照流的角色划分&#xff1a;节点流和处理流 所有输入流的基类&#xff1a;InputStream/Reader 字节流/字符流 所有输出流的基类&#xff1a;OutputStream/Reader 字…

Android数据存储——文件存储、SharedPreferences、SQLite、Litepal

数据存储全方案——详解持久化技术 Android系统中主要提供了3中方式用于简单地实现数据持久化功能&#xff0c;即文件存储、SharedPreference存储以及数据库存储。除了这三种方式外&#xff0c;还可以将数据保存在手机的SD卡中&#xff0c;不给使用文件、SharedPreference或者…

DroneCAN 最新开发进展,Andrew在Ardupilot开发者大会2024的演讲

本文是Andrew演讲的中文翻译&#xff0c;你可以直接观看视频了解演讲的全部内容&#xff0c;此演讲视频的中文版本已经发布在Ardupilot社区的Blog板块&#xff0c;你可以在 Arudpilot官网&#xff08;https://ardupilot.org) 获取该视频&#xff1a; 你也可以直接通过Bilibili链…

(vue)启动项目报错The project seems to require pnpm but it‘s not installed

(vue)启动项目报错The project seems to require pnpm but it’s not installed 原因 该错误信息表明你的项目需要使用 pnpm 作为包管理工具&#xff0c;但系统中尚未安装 pnpm。 解决方法 【1】删除pnpm.lock 【2】npm install -g pnpm 之后再重新启动 yarn报错&#xff0…

【R安装】VSCODE安装及R语言环境配置

目录 VSCODE下载及安装VSCODE上配置R语言环境参考 Visual Studio Code&#xff08;简称“VSCode” &#xff09;是Microsoft在2015年4月30日Build开发者大会上正式宣布一个运行于 Mac OS X、Windows和 Linux 之上的&#xff0c;针对于编写现代Web和云应用的跨平台源代码编辑器&…

微信小游戏/抖音小游戏SDK接入踩坑记录

文章目录 前言问题记录1、用是否存在 wx 这个 API 来判断是微小平台还是抖小平台不生效2、微小支付的参数如何获取?3、iOS 平台不支持虚拟支付怎么办?微小 iOS 端支付时序图:抖小 iOS 端支付:4、展示广告时多次回调 onClose5、在使用单例时 this 引起的 bug6、使用 fetch 或…

Python 3 教程第24篇(输入和输出)

Python3 输入和输出 在前面几个章节中&#xff0c;我们其实已经接触了 Python 的输入输出的功能。本章节我们将具体介绍 Python 的输入输出。 输出格式美化 Python两种输出值的方式: 表达式语句和 print() 函数。 第三种方式是使用文件对象的 write() 方法&#xff0c;标准输…

ChatGPT的应用场景:开启无限可能的大门

ChatGPT的应用场景:开启无限可能的大门 随着人工智能技术的快速发展,自然语言处理领域迎来了前所未有的突破。其中,ChatGPT作为一款基于Transformer架构的语言模型,凭借其强大的语言理解和生成能力,在多个行业和场景中展现出了广泛的应用潜力。以下是ChatGPT八个最具代表…

尚硅谷学习笔记——Java设计模式(一)设计模式七大原则

一、介绍 在软件工程中&#xff0c;设计模式&#xff08;design pattern&#xff09;是对软件设计中普遍存在&#xff08;反复出现&#xff09;的各种问题&#xff0c;提出的解决方案。我们希望我们的软件能够实现复用性、高稳定性、扩展性、维护性、代码重用性&#xff0c;所以…

【iOS】知乎日报总结

文章目录 前言首页网络请求轮播图上滑加载图片请求 文章详情页WKWebView的使用点赞、收藏持久化——FMDB的使用 其他问题沙盒问题单元格点击其他 总结 前言 在系统学习了OC语言和UI控件后&#xff0c;知乎日报是第一个比较大的项目&#xff0c;耗时一个多月时间&#xff0c;里面…

网安瞭望台第6期 :XMLRPC npm 库被恶意篡改、API与SDK的区别

国内外要闻 XMLRPC npm 库被恶意篡改&#xff0c;窃取数据并部署加密货币挖矿程序 网络安全研究人员发现了一起在 npm 包注册表上活跃了一年多的软件供应链攻击。名为 0xengine/xmlrpc 的 npm 包最初是一个无害的库&#xff0c;基于 JavaScript&#xff0c;用于 Node.js 的 XML…

双向链表

目录 链表的分类 概念 双向链表的实现 ① 结构 ② 初始化 ③ 打印 ④ 插入数据 ⑤ 删除数据 ⑥ 查找数据 ⑦ 在pos位置之前插入数据 ⑧ 删除pos位置的数据 ⑨ 销毁链表 总结 链表的分类 虽然有这么多的链表的结构&#xff0c;但是我们实际中最常⽤还是两种结构&…

怎么样才算得上熟悉高并发编程?

提到并发编程很多人就会头疼了&#xff1b;首先就是一些基础概念&#xff1a;并发&#xff0c;并行&#xff0c;同步&#xff0c;异步&#xff0c;临界区&#xff0c;阻塞&#xff0c;非阻塞还有各种锁全都砸你脸上&#xff0c;随之而来的就是要保证程序运行时关键数据在多线程…

PyCharm中Python项目打包并运行到服务器的简明指南

目录 一、准备工作 二、创建并设置Python项目 创建新项目 配置项目依赖 安装PyInstaller 三、打包项目 打包为可执行文件 另一种打包方式&#xff08;使用setup.py&#xff09; 四、配置服务器环境 五、上传可执行文件到服务器 六、在服务器上运行项目 配置SSH解释…

clickhouse 分片键的重要性

文章目录 背景反思为啥出现问题为啥默认的语义是local 背景 问题背景 详细内容可以看这个 反思为啥出现问题 为啥会出现链接里出现的问题&#xff0c;对于goal join 和 join 语义不一样的问题&#xff0c;那是因为分片键设计不合理的情况 如果表a和表b 都是user_id 作分片键…

S4 UPA of AA :新资产会计概览

通用并行会计&#xff08;Universal Parallel Accounting&#xff09;可以支持每个独立的分类账与其他模块集成&#xff0c;UPA主要是为了支持平行评估、多货币类型、财务合并、多准则财务报告的复杂业务需求 在ML层面UPA允许根据不同的分类账规则对物料进行评估&#xff0c;并…