前言
这是c++协程实现第二篇,这里开始我们将开始真正意义上开始实现协程。对协程基础流程不清楚的,可以看我的第一篇。 后续可能需要一定的模板知识,可以看下我的模板的文章,那些知识就完全够用了。本篇将实现一个协程封装的异步任务队列,即一个耗时任务到其他线程完成后,继续恢复执行流。在这里你可以看到如何通过协程去回调。下面就直接开始吧。
协程实现
上一篇我们已经谈过,协程最大的一个好处就是去回调。在工程中,我们往往因为效率,而选择异步接口,或者不希望堵塞任务线程,而将某些耗时任务丢到线程池中执行,我们往往需要传入一个回调,待耗时任务完成后,调用来执行后续操作,但回调的引入,割裂了代码逻辑,大大加重了对业务的理解难度,以及修改流程的难度,在工程中我就深受其害。直接上代码。
代码
#include <coroutine>
#include <future>
#include <chrono>
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <memory>
#include <vector>struct async_task_base
{virtual void completed() = 0;virtual void resume() = 0;
};std::mutex m;
std::vector<std::shared_ptr<async_task_base>> g_resume_queue; //原来的 eventloop队列
std::vector<std::shared_ptr<async_task_base>> g_work_queue; //执行耗时操作线程队列template <typename T>
struct AsyncAwaiter;using namespace std::chrono_literals;struct suspend_always{bool await_ready() const noexcept { try{std::cout << "suspend_always::await_ready" << std::endl;}catch(const std::exception& e){std::cerr << e.what() << '\n';}return false; }void await_suspend(std::coroutine_handle<> handle) const noexcept {try{std::cout << "suspend_always::await_suspend" << std::endl;}catch(const std::exception& e){std::cerr << e.what() << '\n';}}void await_resume() const noexcept {try{std::cout << "suspend_always::await_resume" << std::endl;}catch(const std::exception& e){std::cerr << e.what() << '\n';}}};struct suspend_never{bool await_ready() const noexcept { try{std::cout << "suspend_never::await_ready" << std::endl;}catch(const std::exception& e){std::cerr << e.what() << '\n';}return true; }void await_suspend(std::coroutine_handle<> handle) const noexcept {try{std::cout << "suspend_never::await_suspend" << std::endl;}catch(const std::exception& e){std::cerr << e.what() << '\n';}}void await_resume() const noexcept {try{std::cout << "suspend_never::await_resume" << std::endl;}catch(const std::exception& e){std::cerr << e.what() << '\n';}}};struct Result {struct promise_type {promise_type(){std::cout << "promise_type" << std::endl;}~promise_type(){std::cout << "~promise_type" << std::endl;}suspend_never initial_suspend() {std::cout << "initial_suspend" << std::endl;return {};}suspend_never final_suspend() noexcept {std::cout << "final_suspend" << std::endl;return {};}Result get_return_object() {std::cout << "get_return_object" << std::endl;return {};}void return_void() {std::cout << "return_void" << std::endl;}// void return_value(int value) {// }void unhandled_exception() {}};
};template <typename ReturnType>
struct AsyncThread
{using return_type = ReturnType;AsyncThread(std::function<return_type ()>&& func): func_(func){}std::function<return_type ()> func_;
};template <typename ReturnType>
struct async_task: public async_task_base{async_task(AsyncAwaiter<ReturnType> &awaiter):owner_(awaiter){}void completed() override{std::cout << "async_task :: completed ############" << std::endl;ReturnType result = owner_.func_();owner_.value_ = result;}void resume() override{std::cout << "async_task :: resume ############" << std::endl;owner_.h_.resume();}AsyncAwaiter<ReturnType> &owner_ ;
};template <typename ReturnType>
struct AsyncAwaiter
{using return_type = ReturnType;AsyncAwaiter(AsyncThread<ReturnType>& info){// std::cout<< " AsyncAwaiter(AsyncThread<ReturnType>& info)" << std::endl;value_ = return_type{};func_ = info.func_;}// 该awaite直接挂起bool await_ready() const noexcept { return flag; }void await_suspend(std::coroutine_handle<> h) {h_ = h;std::lock_guard<std::mutex> g(m);g_work_queue.emplace_back(std::shared_ptr<async_task_base>( new async_task<uint64_t>(*this)));}return_type await_resume() const noexcept { // std::cout<< "AsyncAwaiter::await_resume" << std::endl;return value_;}bool flag = false;std::function<return_type ()> func_;std::coroutine_handle<> h_; return_type value_ = return_type();
};template<typename T>
inline AsyncAwaiter<T> operator co_await(AsyncThread<T>&& info)
{return AsyncAwaiter(info);
}template <typename ReturnType>
AsyncThread<ReturnType> do_slow_work(std::function< ReturnType () > &&func){return AsyncThread<ReturnType>(std::forward< std::function< ReturnType () > >(func));
}Result Coroutine() {int a = 1;auto func =[&]() -> uint64_t{// std::cout<< "do a slow work !!!!!!!!!!!!!!!!!!!!!" << std::endl;return a;}; uint64_t result = co_await do_slow_work<uint64_t>(func);std::cout << "@@@@@@@@@ result1 is : " << result << std::endl; a = 2;result = co_await do_slow_work<uint64_t>(func);std::cout << "@@@@@@@@@ result2 is : " << result << std::endl; a = 3;result = co_await do_slow_work<uint64_t>(func);std::cout << "@@@@@@@@@ result3 is : " << result << std::endl; co_return;
};void do_work() {while (1){// 加锁// std::cout << "void do_work() " << std::endl;// std::this_thread::sleep_for(std::chrono::seconds(1)); //!!!!!还有这个加锁要在锁钱前不然,让出cpu后,由于还没有解锁,又会被其他线程再拿到锁,这样就死锁了std::lock_guard<std::mutex> g(m);// std::cout << " g_work_queue size " << g_resume_queue.size() << std::endl;for(auto task : g_work_queue){task->completed();g_resume_queue.push_back(task);}// g_resume_queue.assign(g_work_queue.begin(), g_work_queue.end()); //!!!!!!!这里有个大坑坑查了好久,如果连续两次先进来这里,会把g_raw_work_queue中的元素给清理掉,导致后面无法恢复g_work_queue.clear();// std::cout << " g_resume_queue size " << g_resume_queue.size() << std::endl;} }void run_event_loop(){std::vector<std::shared_ptr<async_task_base>> g_raw_work_queue_tmp;while(1){g_raw_work_queue_tmp.clear();// std::this_thread::sleep_for(std::chrono::seconds(1)); {std::lock_guard<std::mutex> g(m);// for(auto &task : g_resume_queue){// task->resume();// }g_raw_work_queue_tmp.swap(g_resume_queue);}for(auto &task : g_raw_work_queue_tmp){task->resume();}}
}void test_func(){Coroutine();
}int main(){test_func();std::thread work_thread(do_work);run_event_loop();
}
代码分析
我们先从整体上分析下代码,大致分为以下几部分
AsyncThread,AsyncAwaiter,operator co_await :这三个构成了等待体的基本挂起
Result,suspend_always,suspend_never : 这三个构成一个最基本的协程,这三个沿用上一节的内容
g_resume_queue,g_work_queue,do_work,run_event_loop :构成了一个最基础的event_loop + 异步任务队列
接下来我们对这三块进行分析和讲解
do_slow_work
首先我们看下do_slow_work这个函数,这是一个异步函数,可以支持挂起操作,但注意他不是协程函数,这个区分很重要,这直接导致该函数体内是不可以再调用co_await挂起do_slow_work的
这是个简单的模板函数,该函数的返回值是模板类AsyncThread,AsyncThread的模板参数有参数列表中的函数对象func的返回类型推导出。这个函数很简单,返回了一个AsyncThread对象。这个对象既不是协程类型,也不是等待体,那为什么能挂起当前协程呢?
operator co_await
co_await 是关键字,也是运算符,当co_await的操作数不是awaiter对象时会报错,所以我们需要重载运算符,将AsyncThread转换为
awaiter对象,给co_await使用,这就解释了上面为什么能挂起协程的原因。但这时,你可能又会有疑问,那**do_slow_work为什么不直接返回AsyncAwaiter作为返回值呢?**是的这的确可以,但是我们awaiter作为底层,我们不希望让业务层知道细节,所以用了AsyncThread作为一个代理屏蔽了细节
AsyncAwaiter
接下来我们自然而然就想知道AsyncAwaiter是怎么实现的
有了上一篇的基础,我们很容易就可以看出该awaiter对象,必定会挂起,然后将协程句柄保存在awaiter对象中,并创建async_task添加到任务队列g_work_queue中。
当任务完成,调用resume后,会调用await_resume,将完成的值返回出去,模板参数化返回值,以此支持不同类型的返回值,使co_await一个异步函数使用方式尽可能和普通函数相似。
协程体
挂起异步函数不会涉及到和协程体的交互,它只需要提供一个协程作为挂起的承载就够了。这时你一定会直觉上觉得协程体Reuslt作用不应该这么少。是的这里我们留个悬念。
任务队列
任务队列,这里有两个:g_resume_queue和g_work_queue。交互流程是,awaiter添加任务到g_work_queue,工作线程do_work将耗时操作完成后,移交g_resume_queue,由主线程取出调用resume恢复执行。这里有个知识点:协程在哪调用resume,协程就在那个线程中恢复执行,在对有序性要求高的系统中,为了保证有序性,所以我们让主线程来恢复协程执行。
多线程
涉及到多线程,往往问题会很多。工作后,其实基本只在写单线程的代码,这一块经验的不足在这里就暴露出来了。
1.do_work锁还未释放,就调用了sleep,让出了cpu,run_event_loop线程加锁。造成了死锁
2.g_resume_queue.assign(g_work_queue.begin(), g_work_queue.end()) 这行代码,由于do_work线程连续两次被调度到,导致g_resume_queue中的任务被释放,从而导致挂起点永不恢复。这个和死锁混在一起,真的很难查。
3.恢复这里写有个坑,之前我习惯性的是这样resume,但这样也出现了死锁问题。当协程中只co_await一次时,不会出现死锁问题。但如果想下面连续挂起多次,就会出现死锁。原因是锁还未解开,你resume执行,代码执行到了第二do_slow_work,这里又执行到了等待体的await_suspend,这里向g_work_queue加了锁,这就是A资源还未解锁对B资源加了锁,又出现了死锁。所以应该在解锁后再恢复协程执行流。
至此不知不觉把毕业时面试背的几种死锁的情况遇到了个遍。
运行结果
然后我们编译运行下结果,完美执行
补充
到这里我们再补充下几个知识点
协程挂起,会把用到的参数拷贝存储起来,所以使用lambda捕获参数的时候要特别小心,如果按照引用捕获,一定要确定该对象是协程内的变量,而不是通过普通函数通过引用传进来的,因为引用本身就是指针,所以引用拷贝的时地址,对象本身可能由于函数执行完被析构。
上文的awaiter对象由于是在协程中定义的,所以知道协程执行完之前是不会释放的,所以resume时无需担心内存被释放的问题。
至此我们完成了去回调的目标,这是我们回想那如果我又定义了一个协程,想挂起之前的写协程行不行?
当然可以,但是目前我们编译不行。这个具体实现,我们在下一篇实现。