有时我们不仅要共享数据,也要让独立线程上的行为同步。例如,某线程只有先等待另一线程的任务完成,才可以执行自己的任务。
C++提供了处理工具:条件变量和future
并且进行了扩充:线程闩(latch),线程卡(barrier)
1 等待事件或等待其他条件
如果线程甲需要等待线程乙完成任务,可以采取几种不同的方式:
方式一:在共享数据内部维护一标志(受互斥保护),线程乙完成任务后,就设置标志成立。
该方式存在双重浪费:
1 线程甲须不断检查标志,耗费资源。
2 互斥一旦被锁住,其他线程无法再加锁(包括想要设置标志成立时的线程乙)。
方式二:让线程甲调用std::this_thread::sleep_for(),在各次查验之间短期休眠。
#include <mutex>
#include <thread>
bool flag;
std::mutex m;
void wait_for_flag() {std::unique_lock<std::mutex> lk(m);while (!flag) {lk.unlock();std::this_thread::sleep_for(std::chrono::milliseconds(100));lk.lock();}
}
让线程释放锁并休眠,让别的线程获取锁,这种写死的时间很难确定合适的值,而且合适的值也可能会随着系统运行而动态变化。
方式三:使用C++标准库的工具等待事件发生,优先使用这种方式。
比如条件变量。
1.1 使用条件变量来等待条件成立
两种条件变量的实现:
std::condition_variable和std::condition_variable_any。在头文件<condition_variable>内声明。
std::condition_variable仅限于和std::mutex一起使用,有更好的性能。
std::condition_variable_any可以和足以充当互斥的任一类型配合使用,更加通用,但是可能产生额外开销。
1.1.1 std::condition_variable
#include <condition_variable>
#include <mutex>
#include <queue>
std::mutex mut;
std::queue<data_chunk> data_queue;
std::condition_variable data_cond;void data_preparation_thread() {while (more_data_to_prepare()) {data_chunk const data = prepare_data();{std::lock_guard<std::mutex> lk(mut);data_queue.push(data);}data_cond.notify_one();}
}void data_processing_thread() {while (true) {std::unique_lock<std::mutex> lk(mut);data_cond.wait(lk, []{return !data_queue.empty();});data_chunk data=data_queue.front();data_queue.pop();lk.unlock();process(data);if (is_lask_chunk(data)) {break;}}
}
线程data_preparation_thread,在数据准备完成后,使用条件变量的notify_one(),通知一个正在等待的条件变量。
线程data_processing_thread,正在wait的条件变量收到通知后,继续往下进行,处理完成后对lk解锁(来让其他线程能够获取锁),这种需要加解锁灵活性的场景,让我们在这里选择了unique_lock而不是lock_guard。
1.1.2 伪唤醒
伪唤醒:如果线程data_processing_thread重新获得互斥,并且查验条件,但是这个行为不是直接响应线程data_preparation_thread的通知,就是伪唤醒。
这种伪唤醒出现的数量和频率都不确定。因此,若判定函数有副作用,则不建议选取它来查验条件。如果真的要这么做,有可能产生多次副作用。例如:每次被调用时提升线程优先级,多次伪唤醒可以使线程优先级非常高。
1.1.3 std::condition_variable::wait()
本质上是忙等的优化。
1.2 使用条件变量构建线程安全的队列
#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>template<typename T>
class threadsafe_queue {
private:mutable std::mutex mut;std::queue<T> data_queue;std::condition_variable data_cond;public:threadsafe_queue(){}threadsafe_queue(threadsafe_queue const& other) {std::lock_guard<std::mutex> lk(other.mut);data_queue=other.data_queue;}void push(T new_value) {std::lock_guard<std::mutex> lk(mut);data_queue.push(new_value);data_cond.notify_one();}void wait_and_pop(T& value) {std::unique_lock<std::mutex> lk(mut);data_cond.wait(lk, [this]{return !data_queue.empty();});value = data_queue.front();data_queue.pop();}std::shared_ptr<T> wait_and_pop() {std::unique_lock<std::mutex> lk(mut);data_cond.wait(lk, [this]{return !data_queue.empty();});std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));data_queue.pop();return res;}bool try_pop(T& value) {std::lock_guard<std::mutex> lk(mut);if (data_queue.empty()) {return false;}value = data_queue.front();data_queue.pop();return true;}std::shared_ptr<T> try_pop() {std::lock_guard<std::mutex> lk(mut);if (data_queue.empty()) {return std::shared_ptr<T>();}std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));data_queue.pop();return res;}bool empty() const {std::lock_guard<std::mutex> lk(mut);return data_queue.empty();}
};
2 使用future等待一次事件发生
C++标准库使用future来模拟一次性事件:若线程等待某个特定的一次性事件发生,则会以一个恰当的方式获取一个future,他代表目标时间;接着,该线程就能一边执行其他任务,一边在future上等待;同时,他以短暂的间隔反复查验目标事件是否已经发生。
这个线程也可以转换运行模式:先不等目标事件发生,直接暂缓当前任务,切换到别的任务,必要时再回头等待future准备就绪。
future可能与数据关联,也可能未关联,一旦目标事件发生,其future即进入就绪状态,无法重置。
C++标准库有两种future:独占future(std::future<>)和共享future(std::shared_future<>)他们的设计参照了unique_ptr和shared_ptr。同一个事件仅仅允许关联唯一一个std::future实例,但是可以关联多个shared_future实例。
future能用于线程间通信,但是本身不提供同步访问,若多个线程需要同时访问一个future对象,需要使用互斥或其他同步方式。
2.1 从后台任务返回值std::async
只要我们并不急需线程运算的值,就可以使用std::async()按异步方式启动任务。我们从std::async()函数处获得std::future对象(而非std::thread对象),运行的函数一旦完成,其返回值就由该对象最后持有。若要用到这个值,只需再future对象上调用get(),当前线程就会阻塞,以便future准备妥当并返回该值。
#include <future>
#include <iostream>
int find_the_answer_to_ltuae();
void do_other_stuff();int main() {std::future<int> the_answer = std::async(find_the_answer_to_ltuae);do_other_stuff();std::cout << "the answer is " << the_answer.get() << std::endl;
}
std::async的第一个参数是函数指针,第二个是用在调用函数之上的参数,其余类推。
如果std::async的参数是右值,则通过移动原始参数构建副本。
#include <string>
#include <future>struct X {void foo(int, std::string const &);std::string bar(std::string const &);
};X x;调用了p->foo(42, "hello");,其中p的值是&x
auto f1 = std::async(&X::foo, &x, 42, "hello");
调用了tempx.bar("goodbye");,其中tempx是x的副本
auto f2 = std::async(&X::bar, x, "goodbye");struct Y {double operator() (double);
};Y y;
调用tmpy(3.141)。其中由Y()生成的一个匿名变量传递给std::async(),进而发生移动构造。
在std::async()内部产生对象tmpy,在tmpy上执行Y::operator()(3.141)
auto f3 = std::async(Y(), 3.141);
调用y(2.718);
auto f4 = std::async(std::ref(y), 2.718);X baz(X&);
// 调用baz(x)
// std::async(baz, std::ref(x));class move_only {
public:move_only();move_only(move_only&&);move_only(move_only const&) = delete;move_only& operator=(move_only&&);move_only& operator=(move_only const&) = delete;void operator() ();
};
调用tmp(),其中tmp等价于std::move(move_only());
它的产生过程与std::async(Y(), 3.141);类似
auto f5 = std::async(move_only());
运行新线程
auto f6 = std::async(std::launch::async, Y(), 1.2);
在wait或get内部运行任务函数
auto f7 = std::async(std::launch::deferred, baz, std::ref(x));
交由实现自行选择运行方式
auto f8 = std::async(std::launch::deferred | std::launch::async, baz, std::ref(x));
交由实现自行选择运行方式
auto f9 = std::async( baz, std::ref(x));签名f7的任务函数调用被延后,到这里运行
f7.wait();
2.2 关联future实例和任务std::packaged_task<>
std::packaged_task<>连结了future对象与函数。std::package_task<>对象在执行任务时,会调用关联的函数(或可调用对象),把返回值保存为future的内部数据,并令future准备就绪。
类模板std::package_task<>具有成员函数get_future(),它返回std::future<>实例,该future的特化类型取决于函数签名所指定的返回值。
std::package_task<>还具备函数调用操作符,他的参数取决于函数签名的参数列表。
#include <deque>
#include <mutex>
#include <future>
#include <thread>
#include <utility>std::mutex m;std::deque<std::packaged_task<void()>> tasks;
bool gui_shutdown_message_received();
void get_and_process_gui_message();
void gui_thread() {while (!gui_shutdown_message_received()) {get_and_process_gui_message();std::packaged_task<void()> task;{std::lock_guard<std::mutex> lk(m);if (tasks.empty()) {continue;}task = std::move(tasks.front());task.pop_front();}task();}
}std::thread gui_bg_thread(gui_thread);template<typename Func>
std::future<void> post_task_for_gui_thread(Func f) {std::packaged_task<void()> task(f);std::future<void> res = task.get_future();std::lock_guard<std::mutex> lk(m);tasks.push_back(std::move(task));return res;
}
2.3 创建std::promise
有些任务无法以简单的函数调用表达,还有一些任务的执行结果可能来自多个部分的代码。这种时候就需要用到std::promise显示异步求值。
在处理大量网络连接时,通常使用少量线程处理,来避免大量线程带来的上下文切换开销等。
配对的std::promise和std::future可实现下面的工作机制:等待数据的线程在future上阻塞,而提供数据的线程利用相配的promise设定关联的值,使future准备就绪。
若需从给定的std::promise实例获取关联的std::future对象,调用前者的get_future()即可。promise的值通过成员函数set_value()设置,只要设置好,future就准备就绪。
#include <future>
void process_connections(connection_set& connections) {while (!done(connections)) {for (connection_iterator connection=connections.begin(), end=connections.end(); connection!= end; ++connection) {if (connection->has_incoming_data()) {data_packet data = connection->incoming();std::promise<payload_type>& p = connection->get_promise(data.id);p.set_value(data.payload);}if (connection->has_outgoing_data()) {outgoing_packet data = connection->top_of_outgoing_queue();connection->send(data.payload);data.promise.set_value(true);}}}
}
2.4 将异常保存到future中
经由std::async()调用抛出的异常被保存到future中,等到get()调用,存储在内的异常会被抛出。
任务包装在packaged_task也是如此。std::promise也有这个功能,使用set_expection()。
#include <future>
extern std::promise<double> some_promise;
try {some_promise.set_value(value());
} catch (...) {some_promise.set_exception(std::current_exception());
}
2.5 多个线程同时等待std::shared_future
若在多个线程上访问同一个std::future,不采取措施会发生抢占。std::shared_future则可以解决这个问题,他可以复制出副本,但是它们全指向同一异步任务的状态数据。
std::shared_future的实例依据std::future的实例构造而得,前者的异步状态由后者决定。由于std::future独占异步状态,因此想要创建shared_future,需要使用std::move向其默认构造函数传递归属权。
#include <assert.h>
#include <future>
std::promise<int> p;
std::future<int> f(p.get_future());
// assert(f.valid());
std::shared_future<int> sf(std::move(f));隐式的归属权转换
std::shared_future<int> sf(p.get_future());
转移给sf后,对象f不再有效。
3 限时等待
有两种超时机制:
1 迟延超时,线程根据指定的时长而继续等待。
2 绝对超时,在某特定时间点来临之前,线程一直等待。
3.1 时钟类
使用std::chrono::system_clock::now()来获取系统当前时刻。
若时钟类每秒计数25次,那么表示为std::ratio<1, 25>
若时钟类每2.5秒计数1次,那么表示为std::ratio<5, 2>
std::chrono::steady_clock:恒温时钟类
std::chrono::system_clock:系统时钟类
std::chrono::high_resolution_clock:高精度时钟类
3.2 时长类
std::chrono::duration<>,是类模板,具有两个模板参数,前者指明采用何种类型表示计时单元的数量,后者是一个分数,设定该时长类的每一个计时单元代表多少秒。
例如:采用short值计数的分钟时长类:
std::chrono::duration<short, std::ratio<60, 1>>(1分钟60秒)
采用double值计数的毫秒时长类:
std::chrono::duration<short, std::ratio<1, 1000>>(1毫秒是1/1000秒)
std::chrono::milliseconds ms(22222);
std::chrono::seconds s = std::duration_cast<std::chrono::seconds>(ms);
3.3 时间点类
由类模板std::chrono::time_point<>的实例表示,第一个参数指明所参考的时钟,第二个参数指明计时单元。
例如:以系统时钟为参考,计时单元为分钟
std::chrono::time_point<std::chrono::system_clock, std::chrono::minutes>
可以将时间点加减时长,从而得出新的时间点。
比如:
std::chrono::system_clock::now() + std::chrono::minutes(423)
如果两个时间点共享一个时钟,我们也可以用它相减来得到时长。
3.4 接受超时时限的函数
用来设定休眠时间,延迟线程执行,设置超时时长等。