前言
到这就是协程的最后一节了。希望能帮到大家
代码
到这里我们整合下之前二、三节的代码
#include <coroutine>
#include <functional>
#include <chrono>
#include <iostream>
#include <thread>
#include <mutex>
#include <memory>
#include <vector>struct async_task_base
{virtual void completed() = 0;virtual void resume() = 0;
};std::mutex m;
std::vector<std::shared_ptr<async_task_base>> g_event_loop_queue;
std::vector<std::shared_ptr<async_task_base>> g_resume_queue; //多线程异步任务完成后后,待主线程恢复的线程
std::vector<std::shared_ptr<async_task_base>> g_work_queue; //执行耗时操作线程队列enum class EnumAwaiterType:uint32_t{EnumInitial = 1, //协程initialEnumSchduling = 2,// 用户co_awaitEnumFinal = 3//销毁
};template <typename ReturnType>
struct CoroutineTask;template <typename CoTask, EnumAwaiterType AwaiterType >
struct CommonAwaiter ;template <typename>
struct AsyncAwaiter;template <typename ReturnType>
struct AsyncThread
{using return_type = ReturnType;AsyncThread(std::function<return_type ()>&& func): func_(func){}std::function<return_type ()> func_;
};template <typename ReturnType>
struct async_task: public async_task_base{async_task(AsyncAwaiter<ReturnType> &awaiter):owner_(awaiter){}void completed() override{ReturnType result = owner_.func_();owner_.value_ = result;}void resume() override{owner_.h_.resume();}AsyncAwaiter<ReturnType> &owner_ ;
};template <typename ReturnType>
struct AsyncAwaiter
{using return_type = ReturnType;AsyncAwaiter(AsyncThread<ReturnType>& info){// std::cout<< " AsyncAwaiter(AsyncThread<ReturnType>& info)" << std::endl;value_ = return_type{};func_ = info.func_;}bool await_ready() const noexcept { return false; }void await_suspend(std::coroutine_handle<> h) {h_ = h;std::lock_guard<std::mutex> g(m);g_work_queue.emplace_back(std::shared_ptr<async_task_base>( new async_task<uint64_t>(*this)));}return_type await_resume() const noexcept { return value_;}std::function<return_type ()> func_;std::coroutine_handle<> h_; return_type value_ = return_type();
};template <typename CoTask, EnumAwaiterType AwaiterType>
struct coroutine_task: public async_task_base{coroutine_task(CommonAwaiter<CoTask, AwaiterType> &awaiter):owner_(awaiter){}void completed() override{}void resume() override{if(owner_.h_.done()){owner_.h_.destroy();}else{owner_.h_.resume();}}CommonAwaiter<CoTask,AwaiterType> &owner_ ;
};template <typename CoTask, EnumAwaiterType AwaiterType = EnumAwaiterType::EnumSchduling>
struct CommonAwaiter
{using return_type = typename CoTask::return_type;using promise_type = typename CoTask::promise_type;CommonAwaiter(promise_type* promise):promise_(promise){}bool await_ready() const noexcept { return false;}//也可以直接恢复 // std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) {// return h;// }void await_suspend(std::coroutine_handle<> h) {h_ = h;g_event_loop_queue.emplace_back(std::shared_ptr<async_task_base>( new coroutine_task<CoTask, AwaiterType>(*this)) );}return_type await_resume() const noexcept { return promise_->get_value();}~CommonAwaiter(){}bool resume_ready_= false;promise_type* promise_ = nullptr;std::coroutine_handle<> h_ = nullptr;
};template <typename CoTask>
struct CommonAwaiter<CoTask, EnumAwaiterType::EnumInitial>
{CommonAwaiter(){}bool await_ready() const noexcept { return true;}void await_suspend(std::coroutine_handle<>) {}void await_resume() const noexcept { }~CommonAwaiter(){}
};template <typename CoTask>
struct CommonAwaiter <CoTask, EnumAwaiterType::EnumFinal>
{CommonAwaiter(){}bool await_ready() noexcept { return false;}void await_suspend(std::coroutine_handle<> h) noexcept{h_ = h;g_event_loop_queue.emplace_back(std::shared_ptr<async_task_base>( new coroutine_task<CoTask, EnumAwaiterType::EnumFinal>(*this)));}void await_resume() noexcept{ }std::coroutine_handle<> h_ = nullptr;
};template<typename CoTask>
struct Promise
{using return_type = typename CoTask::return_type ;~Promise(){}CommonAwaiter<CoTask, EnumAwaiterType::EnumInitial> initial_suspend() {return {}; };CommonAwaiter<CoTask, EnumAwaiterType::EnumFinal> final_suspend() noexcept { return {}; }void unhandled_exception(){std::rethrow_exception(std::current_exception());}CoTask get_return_object(){ return CoTask(this);}return_type get_value() {return value_;}void return_value(return_type value){value_ = value;}template<typename T>CommonAwaiter<CoroutineTask<T>> await_transform(CoroutineTask<T> &&task){return CommonAwaiter<CoroutineTask<T>>(task.p_);}template<typename T>inline AsyncAwaiter<T> await_transform(AsyncThread<T>&& info){return AsyncAwaiter(info);}return_type value_;
};template <typename ReturnType>
struct CoroutineTask{using return_type = ReturnType;using promise_type = Promise<CoroutineTask>;CoroutineTask(const CoroutineTask &other) = delete;CoroutineTask(const CoroutineTask &&other) = delete;CoroutineTask& operator=(const CoroutineTask&) {};CoroutineTask& operator=(const CoroutineTask&&) = delete;CoroutineTask(promise_type* promise) {p_ = promise;}promise_type *p_ = nullptr;};void do_work() {while (1){std::lock_guard<std::mutex> g(m);for(auto task : g_work_queue){task->completed();g_resume_queue.push_back(task);}g_work_queue.clear();} }void run_event_loop(){std::vector<std::shared_ptr<async_task_base>> g_raw_work_queue_tmp;std::vector<std::shared_ptr<async_task_base>> g_event_loop_queue_temp;while(1){g_raw_work_queue_tmp.clear();g_event_loop_queue_temp.clear();{g_event_loop_queue_temp.swap(g_event_loop_queue);std::lock_guard<std::mutex> g(m);g_raw_work_queue_tmp.swap(g_resume_queue);}for(auto &task : g_raw_work_queue_tmp){task->resume();}for(auto task : g_event_loop_queue_temp){task->resume();}}
}// ------------------------------------------------------------------------------------------------------template <typename ReturnType>
AsyncThread<ReturnType> do_slow_work(std::function< ReturnType () > &&func){return AsyncThread<ReturnType>(std::forward< std::function< ReturnType () > >(func));
}CoroutineTask<u_int64_t> second_coroutine(){co_return 3;
}CoroutineTask<float> third_coroutine(){co_return 3.1;
}CoroutineTask<char> first_coroutine(){int a = 1;auto func =[&]() -> uint64_t{// std::cout<< "do a slow work !!!!!!!!!!!!!!!!!!!!!" << std::endl;return a;}; uint64_t result = co_await do_slow_work<uint64_t>(func);std::cout << "@@@@@@@@@ result1 is : " << result << std::endl; a = 2;result = co_await do_slow_work<uint64_t>(func);std::cout << "@@@@@@@@@ result2 is : " << result << std::endl; uint64_t num = co_await second_coroutine();std::cout << "@@@@@@@@@ second_coroutine result is : " << num << std::endl; a = 3;result = co_await do_slow_work<uint64_t>(func);std::cout << "@@@@@@@@@ result3 is : " << result << std::endl; float num2 = co_await third_coroutine();a = 4;result = co_await do_slow_work<uint64_t>(func);std::cout << "@@@@@@@@@ third_coroutine result is : " << num2 << std::endl; result = co_await do_slow_work<uint64_t>(func);std::cout << "@@@@@@@@@ result4 is : " << result << std::endl; co_return 'b';
}CoroutineTask<char> Coroutine(){int a = 1;auto func =[&]() -> uint64_t{// std::cout<< "do a slow work !!!!!!!!!!!!!!!!!!!!!" << std::endl;return a;}; uint64_t result = co_await do_slow_work<uint64_t>(func);std::cout << "@@@@@@@@@ result is : " << result << std::endl; uint64_t num = co_await second_coroutine();std::cout << "@@@@@@@@@ coroutine result is : " << num << std::endl; co_return 'b';
}void test_func(){Coroutine();first_coroutine();
}int main(){test_func();std::thread work_thread(do_work);run_event_loop();return 0;
}
分析
将对两种awaiter的co_await操作规则定义到promise中
对co_await可以使用await_transform 和重载co_await运算符,但是两种用法不能同时存在。
优先恢复其他线程完成耗时任务的协程,再进行当前线程中的协程挂起、销毁、恢复调度。
协程函数相对对于协程的优势
协程每次都会在final_suspend和initial_suspend时创建awaiter,以及对awaiter挂起,在for循环中,这样显然不科学,但异步函数只有一个awaiter,在for循环中更合适。
运行结果
待扩展
异步io
如果对使用epoll实现网络io异步函数感兴趣,可以自行实现,实现方式和实现多线程异步函数一样,这里就不实现了,注意下epoll中,不能添加普通文件系统fd。
协程超时机制
可以加上定时器对async_task_base进行超时检查,以此来支持协程超时
流程图
最后附上co_await Coroutine()的流程图