文章目录
- 九、多线程
- 8. POSIX信号量
- 根据信号量+环形队列的生产者消费者模型代码
- 结果演示
- 未完待续
九、多线程
8. POSIX信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
创建多线程的信号量:
销毁多线程之间的信号量:
对信号量做P操作(申请资源):
对信号量做V操作(释放资源):
根据信号量+环形队列的生产者消费者模型代码
Makefile:
cp_ring:Main.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm -f cp_ring
Thread.hpp:
#ifndef __THREAD_HPP__
#define __THREAD_HPP__#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>namespace ThreadModule
{template<typename T>using func_t = std::function<void(T&, const std::string& name)>;template<typename T>class Thread{public:void Excute(){_func(_data, _threadname);}public:Thread(func_t<T> func, T& data, const std::string &name="none-name"): _func(func), _data(data), _threadname(name), _stop(true){}static void *threadroutine(void *args){Thread<T> *self = static_cast<Thread<T> *>(args);self->Excute();return nullptr;}bool Start(){int n = pthread_create(&_tid, nullptr, threadroutine, this);if(!n){_stop = false;return true;}else{return false;}}void Detach(){if(!_stop){pthread_detach(_tid);}}void Join(){if(!_stop){pthread_join(_tid, nullptr);}}std::string name(){return _threadname;}void Stop(){_stop = true;}~Thread() {}private:pthread_t _tid;std::string _threadname;T& _data;func_t<T> _func;bool _stop;};
}#endif
RingQueue.hpp:
#pragma once#include <iostream>
#include <string>
#include <vector>
#include <semaphore.h>// 环形队列类模板
template<class T>
class RingQueue
{
private:// 申请资源void P(sem_t& sem){sem_wait(&sem);}// 释放资源void V(sem_t& sem){sem_post(&sem);}// 加锁void Lock(pthread_mutex_t& mutex){pthread_mutex_lock(&mutex);}// 解锁void Unlock(pthread_mutex_t& mutex){pthread_mutex_unlock(&mutex);}
public:RingQueue(int cap):_cap(cap),_ring_queue(cap),_prodeucer_step(0),_consumer_step(0){sem_init(&_room_sem, 0, _cap);sem_init(&_data_sem, 0, 0);pthread_mutex_init(&_prodeucter_mutex, nullptr);pthread_mutex_init(&_consumer_mutex, nullptr);}// 生产者的入队列函数void Enqueue(const T& in){// 申请空间资源P(_room_sem);// 加锁Lock(_prodeucter_mutex);// 入队列_ring_queue[_prodeucer_step++] = in;// 环形,绕一圈_prodeucer_step %= _cap;// 解锁Unlock(_prodeucter_mutex);// 释放数据资源V(_data_sem);}// 消费者的出队列函数void Pop(T* out){// 申请数据资源P(_data_sem);// 加锁Lock(_consumer_mutex);// 出队列*out = _ring_queue[_consumer_step++];_consumer_step %= _cap;// 解锁Unlock(_consumer_mutex);// 释放空间资源V(_room_sem);}~RingQueue(){sem_destroy(&_room_sem);sem_destroy(&_data_sem);pthread_mutex_destroy(&_prodeucter_mutex);pthread_mutex_destroy(&_consumer_mutex);}
private:// 数组模拟环形队列std::vector<T> _ring_queue;// 容量int _cap;// 生产者和消费者的位置指针int _prodeucer_step;int _consumer_step;// 信号量sem_t _room_sem;sem_t _data_sem;// 互斥锁pthread_mutex_t _prodeucter_mutex;pthread_mutex_t _consumer_mutex;
};
Task.hpp:
#pragma once#include <iostream>
#include <functional>using Task = std::function<void()>;void Download()
{std::cout << "Downloading..." << std::endl;
}
Main.cc:
#include "RingQueue.hpp"
#include "Thread.hpp"
#include "Task.hpp"
#include <string>
#include <vector>
#include <unistd.h>using namespace ThreadModule;
// 创建类型别名
using ringqueue_t = RingQueue<Task>;// 消费者线程
void Consumer(ringqueue_t& rq, const std::string& name)
{while (true){// 获取任务Task t;rq.Pop(&t);std::cout << "Consumer " << name << " : ";// 执行任务t();}
}// 生产者线程
void Productor(ringqueue_t& rq, const std::string& name)
{while (true){// 发布任务rq.Enqueue(Download);std::cout << "Productor " << name << " : " << "Download task" << std::endl;sleep(1);}
}// 启动线程
void InitComm(std::vector<Thread<ringqueue_t>>* threads, int num, ringqueue_t& rq, func_t<ringqueue_t> func)
{for (int i = 0; i < num; i++){// 创建一批线程std::string name = "thread-" + std::to_string(i + 1);threads->emplace_back(func, rq, name);}
}// 创建消费者线程
void InitConsumer(std::vector<Thread<ringqueue_t>>* threads, int num, ringqueue_t& rq)
{InitComm(threads, num, rq, Consumer);
}// 创建生产者线程
void InitProductor(std::vector<Thread<ringqueue_t>>* threads, int num, ringqueue_t& rq)
{InitComm(threads, num, rq, Productor);
}// 等待所有线程结束
void WaitAllThread(std::vector<Thread<ringqueue_t>>& threads)
{for (auto& thread : threads){thread.Join();}
}// 启动所有线程
void StartAll(std::vector<Thread<ringqueue_t>>& threads)
{for (auto& thread : threads){thread.Start();}
}int main()
{// 创建阻塞队列,容量为5ringqueue_t* rq = new ringqueue_t(10);// 创建线程std::vector<Thread<ringqueue_t>> threads;// 创建 1个消费者线程InitConsumer(&threads, 1, *rq);// 创建 1个生产者线程InitProductor(&threads, 1, *rq);// 启动所有线程StartAll(threads);// 等待所有线程结束WaitAllThread(threads);return 0;
}
结果演示
这里演示的是单生产者单消费者的模型,可以在主函数改成多生产者多消费者的模型。