C++11 标准中 头文件中包含了以下几个类和函数:
- Providers 类:std::promise, std::package_task
- Futures 类:std::future, shared_future.
- Providers 函数:std::async()
- 其他类型:std::future_error, std::future_errc, std::future_status, std::launch.
C++11中promise和future机制是用于并发编程的一种解决方案,用于在不同线程完成数据传递(异步操作)
std::promise
std::promise 对象可以保存某一类型 T 的值,该值可被 future 对象读取(可能在另外一个线程中),因此 std::promise 提供了一种线程同步的手段。在 std::promise 对象构造时可以和一个共享状态(通常是std::future)相关联,并可以在相关联的共享状态(std::future)上保存一个类型为 T 的值。
可以通过 get_future 来获取与该 std::promise 对象相关联的 std::future 对象,调用该函数之后,两个对象共享相同的共享状态。
- std::promise 对象是异步 Provider,它可以在某一时刻设置共享状态的值。
- std::future 对象可以异步返回共享状态的值,或者在必要的情况下阻塞调用者并等待共享状态标志变为 ready,然后才能获取共享状态的值。
函数 | 作用 |
---|---|
operator= | 从另一个 std::promise 移动到当前对象。 |
swap() | 交换移动两个 std::promise。 |
get_future() | 获取与其管理的std::future |
set_value() | 设置共享状态值,此后promise共享状态标识变为ready |
set_value_at_thread_exit() | 设置共享状态的值,但是不将共享状态的标志设置为 ready,当线程退出时该 promise 对象会自动设置为 ready |
set_exception() | 设置异常,此后promise的共享状态标识变为ready |
set_exception_at_thread_exit() | 设置异常,但是到该线程结束时才会发出通知。 |
std::promise::get_future
该函数返回一个与 promise 共享状态相关联的 future 。返回的 future 对象可以访问由 promise 对象设置在共享状态上的值或者某个异常对象。只能从 promise 共享状态获取一个 future 对象。在调用该函数之后,promise 对象通常会在某个时间点准备好(设置一个值或者一个异常对象),如果不设置值或者异常,promise 对象在析构时会自动地设置一个 future_error 异常(broken_promise)来设置其自身的准备状态。
#include <iostream>
#include <functional>
#include <thread>
#include <future> // std::promise, std::futurevoid print_int(std::future<int>& fut) {int x = fut.get(); // 获取共享状态的值.std::cout << "value: " << x << '\n'; // 打印 value: 10.
}int main ()
{std::promise<int> prom; // 生成一个 std::promise<int> 对象.std::future<int> fut = prom.get_future(); // 和 future 关联.std::thread t(print_int, std::ref(fut)); // 将 future 交给另外一个线程t.prom.set_value(10); // 设置共享状态的值, 此处和线程t保持同步.t.join();return 0;
}
std::promise 构造函数
std::promise 的 operator= 没有拷贝语义,即 std::promise 普通的赋值操作被禁用,operator= 只有 move 语义,所以 std::promise 对象是禁止拷贝的。
#include <iostream> // std::cout
#include <thread> // std::thread
#include <future> // std::promise, std::futurestd::promise<int> prom;void print_global_promise () {std::future<int> fut = prom.get_future();int x = fut.get();std::cout << "value: " << x << '\n';
}int main ()
{std::thread th1(print_global_promise);prom.set_value(10);th1.join();prom = std::promise<int>(); // prom 被move赋值为一个新的 promise 对象.std::thread th2 (print_global_promise);prom.set_value (20);th2.join();return 0;
}
std::promise::set_exception
为 std::promise 设置异常,此后 std::promise 的共享状态变标志变为 ready,例子如下,线程1 从终端接收一个整数,线程 2 将该整数打印出来,如果线程 1 接收一个非整数,则为 std::promise 设置一个异常 ,线程 2 在 std::future::get 是抛出该异常。
#include <iostream>
#include <functional>
#include <thread>
#include <future>
#include <exception> // std::exception, std::current_exceptionvoid get_int(std::promise<int>& prom) {int x;std::cout << "Please, enter an integer value: ";std::cin.exceptions (std::ios::failbit); // throw on failbittry {std::cin >> x; // sets failbit if input is not intprom.set_value(x);} catch (std::exception&) {prom.set_exception(std::current_exception());}
}void print_int(std::future<int>& fut) {try {int x = fut.get();std::cout << "value: " << x << '\n';} catch (std::exception& e) {std::cout << "[exception caught: " << e.what() << "]\n";}
}int main ()
{std::promise<int> prom;std::future<int> fut = prom.get_future();std::thread th1(get_int, std::ref(prom));std::thread th2(print_int, std::ref(fut));th1.join();th2.join();return 0;
}
std::promise::set_value_at_thread_exit
设置共享状态的值,但是不将共享状态的标志设置为 ready,当线程退出时该 promise 对象会自动设置为 ready。如果某个 std::future 对象与该 promise 对象的共享状态相关联,并且该 future 正在调用 get,则调用 get 的线程会被阻塞,当线程退出时,调用 future::get 的线程解除阻塞,同时 get 返回 set_value_at_thread_exit 所设置的值。注意,该函数已经设置了 promise 共享状态的值,如果在线程结束之前有其他设置或者修改共享状态的值的操作,则会抛出 future_error( promise_already_satisfied )。
std::future
- std::future仅在创建它的std::promise(或者std::async、std::packaged_task)有效时才有用,所以可以在使用前用valid()判断
- std::future可供异步操作创建者用各种方式查询、等待、提取需要共享的值,也可以阻塞当前线程等待到异步线程提供值。
- std::future一个实例只能与一个异步线程相关联,多个线程则需要使用std::shared_future。
函数 | 作用 |
---|---|
operator= | 移动 future 对象,移动! |
share() | 返回一个可在多个线程中共享的 std::shared_future 对象。 |
get() | 获取值(类型由模板类型决定) |
valid() | 检查 future 是否处于被使用状态,也就是它被首次在首次调用 get() 或 share() 前。建议使用前加上valid()判断 |
wait() | 阻塞等待调用它的线程到共享值成功返回。 |
wait_for() | 在规定时间内 阻塞等待调用它的线程到共享值成功返回。 |
wait_until() | 在指定时间节点内 阻塞等待调用它的线程到共享值成功返回。 |
Promise和Future模型
std::future负责访问, std::future是一个模板类,它提供了可供访问异步执行结果的一种方式。
std::promise负责存储, std::promise也是一个模板类,它提供了存储异步执行结果的值和异常的一种方式。
std::future负责访问,std::promise负责存储,同时promise是future的管理者
流程:
- 线程1初始化一个promise和future对象,将promise对象传递给线程2,相当于线程2对线程1的一个承诺
- future相当于一个承诺,用于获取未来线程2的值
- 线程2接受一个promise,需要将处理结果通过promise返回给线程1
- 线程1想要获取数据,此时线程2还未返回promise就阻塞等待处,直到线程2的数据可达
生产-消费模型
mutex + condition_variable
为了不浪费系统资源,我们使用条件变量来使 process_thread 即消费线程进入休眠等待,直到数据准备好了然后通知它来处理。这里为了在线程间进行数据传递,我们需要一个全局变量 data,然后利用锁和条件变量这些机制来保证共享数据的安全。
#include <thread>
#include <mutex>
#include <iostream>
#include <chrono>
#include <memory>
#include <condition_variable>struct _data
{bool ready;int32_t value;
};_data data = { false, 0 };
std::mutex data_mutex;
std::condition_variable data_con;int main()
{//生产数据的线程std::thread prepare_data_thread([](){std::this_thread::sleep_for(std::chrono::seconds(2)); //模拟生产过程, std::unique_lock<std::mutex> ulock(data_mutex);data.ready = true;data.value = 1;data_con.notify_one();});//消费数据的线程std::thread process_data_thread([](){std::unique_lock<std::mutex> ulock(data_mutex);cv.wait(ulock, [](){ return data.ready; });std::cout << data.value << std::endl;});prepare_data_thread.join();process_data_thread.join();system("pause");return 0;
}
promise+future
首先创建一个 _data 类型data_promise ,而在 data_promise 里已经封装好了一个 _data类型的future,调用 promise 的 get_future() 方法得到与之对应的 future。把 data_promise 传递给了 prepare_data_thread,便可以在 prepare_data_thread 里面来产出值了,在休眠2S之后,调用了 set_value() 方法来产出值。将和 data_promise 相关联的 data_future 传递给了 process_data_thread,所以便可以在 process_data_thread 里调用 data_future 的 get() 方法获取 data_promise 的产出值。这里需要注意的一点是,future 的 get() 方法是阻塞的,所以在与其成对的 promise 还未产出值,也就是未调用 set_value() 方法之前,调用 get() 的线程将会一直阻塞在 get()处直到其他任何人调用了 set_value() 方法。
#include <thread>
#include <iostream>
#include <future>
#include <chrono>struct _data
{int32_t value;
};_data data = { 0 };int main()
{std::promise<_data> data_promise; //创建一个承诺std::future<_data> data_future = data_promise.get_future(); //得到这个承诺封装好的期望std::thread prepare_data_thread([](std::promise<_data> &data_promise){std::this_thread::sleep_for(std::chrono::seconds(2)); //模拟生产过程data_promise.set_value({ 1 }); //通过set_value()反馈结果}, std::ref(data_promise));std::thread process_data_thread([](std::future<_data> &data_future){std::cout << data_future.get().value << std::endl; //通过get()获取结果}, std::ref(data_future));prepare_data_thread.join();process_data_thread.join();system("pause");return 0;
}
packaged_task
packaged_task 是对一个任务的抽象,可以给其传递一个函数来完成其构造。相较于 promise,它应该算是更高层次的一个抽象,同样地,可以将任务投递给任何线程去完成,然后通过 packaged_task.get_future() 方法获取的 future 来获取任务完成后的产出值。
packaged_task 也是一个类模板,模板参数为函数签名,也就是传递函数的类型,如上例中为 _data(),返回值为 _data 类型,函数参数为 void,其中返回值类型将决定 future 的类型也就是产出值类型。
创建一个任务,并投递给了 do_task_thread 去完成这个任务,然后将对应的 data_future投递 给了process_data_thread ,就可以在 process_data_thread 里获取任务产出值了。同样地,获取值之前必须等待任务的完成。
#include <thread>
#include <iostream>
#include <future>struct _data
{int32_t value;
};_data data = { 0 };int main()
{std::packaged_task<_data()> prepare_data_task([]()->_data{std::this_thread::sleep_for(std::chrono::seconds(2)); //模拟数据生产过程return{ 1 };});auto data_future = prepare_data_task.get_future(); //获取futurestd::thread do_task_thread([](std::packaged_task<_data()> &task){task(); //调用packaged_task的调用符来运行任务}, std::ref(prepare_data_task));std::thread process_data_thread([](std::future<_data> &data_future){std::cout << data_future.get().value << std::endl;}, std::ref(data_future));do_task_thread.join();process_data_thread.join();system("pause");return 0;
}
async
async 返回一个与函数返回值相对应类型的 future,通过它我们可以在其他任何地方获取异步结果。
由于给 async 提供了 std::launch::async 策略,所以生产过程将被异步执行,具体执行的时间取决于各种因素,最终输出的时间为 2000ms+,可见生产过程和主线程是并发执行的。
除了 std::launch::async,还有一个 std::launch::deferred 策略,它会延迟线程地创造,也就是说只有当调用 future.get() 时子线程才会被创建以执行任务。
#include <thread>
#include <iostream>
#include <chrono>
#include <future>struct _data
{int32_t value;
};_data data = { 0 };int main()
{auto start = std::chrono::steady_clock::now();std::future<_data> data_future = std::async(std::launch::async, []()->_data{std::this_thread::sleep_for(std::chrono::seconds(1)); //模拟生产过程return { 1 };});std::this_thread::sleep_for(std::chrono::seconds(2));std::cout << data_future.get().value << std::endl; //使用产出值auto end = std::chrono::steady_clock::now();std::cout << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() << std::endl;system("pause");return 0;
}