并发
资源管理
资源
- 程序中符合先获取后释放(显式或隐式)规律的东西,比如内存、锁、套接字、线程句柄和文件句柄等。
- RAII: (Resource Acquisition Is Initialization),也称为“资源获取就是初始化”,是C++语言的一种管理资源、避免泄漏的惯用法。
unique_ptr & shared_ptr
- unique_ptr 是一个独立对象或者是数组的句柄,unique_ptr 通过移动操作使得简单高效
- shared_ptr 很多方面和unique_ptr 相似, 唯一区别是shared_ptr 通过拷贝操作而非移动操作。多个shared_ptr 共享该对象的所有权,只有当最后一个 shared_ptr 被销毁时,对象才被销毁。
- shared_ptr 提供的垃圾回收机制需要慎重使用。 shared_ptr 使得对象的生命周期变得不那么容易掌控了,除非在一定程度上一定要使用共享所有权,否则别轻易使用shared_ptr
- shared_ptr 没有指定哪个拥有者有权读写对象
- 当我们使用函数返回集合的时候,不必使用指针,因为如果定义了移动语义, 在返回的时候会默认使用移动操作
并发
任务 和thread
- 线程(thread)是任务在程序中的系统级表示。
- thread 在 中定义
thread 的使用方法
#include <iostream>
#include <thread>
#include <vector>
#include "config.h"// thead 的使用
// 函数的参数这里必须是const, 否则会报thread:120:44: error: static assertion failed: std::thread arguments must be invocable after conversion to rvalues
// 在多线程中使用io 操作,需要考虑线程安全,打印出来的字符服务保证输出内容的顺序
void f(const std::vector<double>& v) {std::cout<< "function f" << std::endl;
}
// 通过res 指针返回thread 的执行结果,不美观的方法
// 不推荐使用此方法,因为无法掌控res 什么时候写入了,一般用消息队列,或者使用condition 、promise等方式来实现返回结果
void f2(const std::vector<double>& v, double* res) {}struct F {std::vector<double> & v;F(std::vector<double>& vv):v{vv} {}/* 如果不使用调用运算符重载, 会编译报错: hread:120:44: error: static assertion failed: std::thread arguments must be invocable after conversion to rvalues*/void operator()() { // 调用运算符重载std::cout << "struct F operator" << std::endl;}};class Foo
{
public:void bar(){for (int i = 0; i < 5; ++i){std::cout << "正在执行线程3\n";++n;std::this_thread::sleep_for(std::chrono::milliseconds(10));}}int n = 0;
};int main(int argc, char **argv) {std::vector<double> some_vec {1,2,3,4,5,6};std::vector<double> vec2{10,11,12,13};std::thread t1{f, some_vec}; // 在t1 的线程里面执行函数f, 参数是some_vecstd::thread t2{F{vec2}}; // 在t2 的县城里面执行函数 F 的调用运算符, 参数是vec2double res;std::thread t3(f2, some_vec, &res);Foo f;std::thread t4(&Foo::bar, &f); // t5 在对象 f 上运行 foo::bar()t1.join();t2.join();return 0;
}
- thread 更多用法清参照cppreference
thread 共享数据
- mutex // 访问数据排他处理
- condition_variable 允许一个thread 等待另外一个thread
- 代码示例
class Message {
private:int msgId;
public:Message(int id) : msgId(id){}int getId() { return msgId;}
};std::queue<Message> mMsgQueue;
std::condition_variable mCond;
std::mutex mMutex;void consumer() {while(true) {//std::cout << "wait mutex " << std::endl;std::unique_lock<std::mutex> lck{mMutex};//std::cout << "wait condition " << std::endl;//while(mCond.wait(lck))/*do nothing*/;mCond.wait(lck);//std::cout << "after wait condition " << std::endl;auto m = mMsgQueue.front();mMsgQueue.pop();std::cout << "get msg Id: " << m.getId() << std::endl;// lck.unlock();}
}void producer() {int index = 10;while(index > 0) {Message msg(index);std::unique_lock<std::mutex> lck{mMutex};mMsgQueue.push(msg);std::cout << "push msg Id: " << msg.getId() << std::endl;lck.unlock(); //(1)mCond.notify_all();//std::cout << "after notify all: " << msg.getId() << std::endl;//std::this_thread::sleep_for(std::chrono::milliseconds(20)); //(2)index--;}
}int main(int argc, char **argv) {std::thread t2(consumer);std::thread t1(producer);t1.join();t2.join();
}
-
对于生产者-消费者模式,理想状态下,生产者生产一个,消费者就消费一个,但是实际并非如此
- (1) 如果不主动调用lck.unlock(), lck 需要等到while(){} 进入下一个循环才会释放,所以无论是否加(), cosumer 线程得不到执行的契机(可能和平台有关)
- (2) 对于加了(1) 而没有(2) 的场合while 很快执行到下一个循环,又立即执行到lck, 也会导致在producer 执行过程中,consumer 得不到执行
执行效果大致如下:
-
而如果加了(1) 和(2)后:
-
对于consume 如上图的处理方式,获取一次锁只pop 一次数据,会因为producer notify 和consumer 的wait 不是时间上不是匹配出现而导致数据没有被全部读取出来,可以修改为如下方式:
-
在获取到一次锁后,把消息队列中的消息全部处理掉
void consumer() {while(true) {//std::cout << "wait mutex " << std::endl;std::unique_lock<std::mutex> lck{mMutex};//std::cout << "wait condition " << std::endl;//while(mCond.wait(lck))/*do nothing*/;mCond.wait(lck);//std::cout << "after wait condition " << std::endl;while(!mMsgQueue.empty()) {auto m = mMsgQueue.front();mMsgQueue.pop();std::cout << "get msg Id: " << m.getId() << std::endl;}// lck.unlock();}
}
执行效果大致如下, 这样只是消费者不会立马消费掉生产者生产的数据,但是即使没有(1)(2)也不会导致消息得不到处理:
任务通信
- c++ 中主要提供了以下三种,都定义在 头文件中。
- future and promise 用来从一个独立线程上创建出的任务返回结果
- packaged_task 是帮助启动义务以及链接返回结果的机制
- async 以非常类似调用函数的方式启动一个任务
future and promis
- 允许两个任务间传输直,而无须显示使用锁–“系统”高效地实现了这种传输。
- 当一个任务需要向另一个任务传输这个值时,把它放入promise。
- c++ 实现会出现在对应的future 中。
- 使用future 的get()函数,如果值还未准备好,线程会阻塞直至值准备好。如果get( )无法计算出来,get()会抛一个异常(系统或者从get() 得到数据的任务)
- promise 的作用主要为future的get() 提供放置 操作(set_value()) 和set_exception().
#include <iostream>
#include <thread>
#include <future>
#include <functional>
void work (std::promise<int>& px) {try {px.set_value(11);} catch (...) {px.set_exception(std::current_exception());}
}void result (std::future<int>& res) {try {int result = res.get();std::cout<< " recieve the result: " << result << std::endl;} catch(...) {//std::cout<<"exception"<< std::current_exception() << std::endl;}
}
int main(int argc, char **argv) {std::promise<int> pro;std::future<int> fut = pro.get_future(); //binding the future and the promisestd::thread t1(work, std::ref(pro));std::thread t2(result, std::ref(fut));t1.join();t2.join();return 0;
}
-
可以看到使用步骤如下:
- 声明promise
- 定义future,并且绑定promise 和future, 二者的模板类型必须一致(很好理解,provise 设定的就是传给future 的)
- 使用promise 的set_value()或者set_exception() 传递值
- 使用future 的get() 函数接收值(注意get()是阻塞的)
-
这里使用了std::ref std::ref的使用场景:
- 算法传递可调用对象,而这些对象可能需要引用传递。
- 在多线程之间共享数据,但有希望通过引用传递数据而不是复制。
- std::ref可以使函数接受任何类型的引用而不仅仅是特定类型
packaged_task
- packaged_task 简化了任务连接future & promise.
- (TODO)