有栈协程和无栈协程
有栈协程和无栈协程
实现一个协程的关键点在于如何保存、恢复和切换上下文,而这也是有栈协程和无栈协程的主要区别。有栈协程通过直接切换堆栈来实现,其构造了一个内存中的栈,而无栈协程使用状态机+按需分配的方式,为所用到的变量开设临时内存,但是实现逻辑更加复杂。
两者各有利弊,只能说无栈的内存利用率更高,而不能说无栈就一定比有栈好。
避免内存分配
异步操作,需要有一个地方用来存储一些对当前操作进度的跟踪,此状态需要一直保持,直到操作完成后才能释放。这一点很好理解,孩子出门读大学,总得知道家在哪儿,还要回家的。
例子-一个简易的线程同步的例子
关于如何使用协程实现一个简单得到线程同步,以下是一个原文中的例子,异步手动重置事件。
此事件的基本要求是,它需要被多个并发执行的协程等待,等待时需要挂起等待的协程,直到某个线程调用.set()
方法,此时任何等待的协程都被恢复。如果某个线程已经调用了.set()
,那么协程应该继续而不挂起。
消费者生产者模型
T value;
async_manual_reset_event event;// A single call to produce a value
void producer()
{value = some_long_running_computation();// Publish the value by setting the event.event.set();
}// Supports multiple concurrent consumers
task<> consumer()
{// Wait until the event is signalled by call to event.set()// in the producer() function.co_await event;// Now it's safe to consume 'value'// This is guaranteed to 'happen after' assignment to 'value'std::cout << value << std::endl;
}
上述的例子是典型的生产者-消费者模型的异步实现,消费者需要保证在等待生产者完成生产,也就是完成value = some_long_running_computation();
这一步操作前,不能消费value,需要确保一个顺序问题。
正如上文所述,无栈协程需要状态机来记录当前状态,比如这个event就有两种状态,set和not set。当处于set状态时,没有正在等待的协程,co_await操作可以继续进行而非挂起。当处于not set的状态时,将会有一批正在等待的协程,等着变成set状态,这一批协程的数量可能为0。
该状态可以用一个简单的std::atomic<void*>
来代表。
我们可以通过在coroutine frame中通过存储awaiter对象来避免在堆上进行额外存储。下面这个例子将说明如何实现:
简单的接口
class async_manual_reset_event
{
public:async_manual_reset_event(bool initiallySet = false) noexcept;// No copying/movingasync_manual_reset_event(const async_manual_reset_event&) = delete;async_manual_reset_event(async_manual_reset_event&&) = delete;async_manual_reset_event& operator=(const async_manual_reset_event&) = delete;async_manual_reset_event& operator=(async_manual_reset_event&&) = delete;bool is_set() const noexcept;struct awaiter;awaiter operator co_await() const noexcept;void set() noexcept;void reset() noexcept;private:friend struct awaiter;// - 'this' => set state// - otherwise => not set, head of linked list of awaiter*.mutable std::atomic<void*> m_state;};
显然,这个接口足够直接且简单,但是这里还需要注意的是,此处尚未定义awaiter,接下来就来定义awaiter结构体。
定义awaiter
- 需要通过初始化来确定哪一个async_manual_reset_event对象会被等待。
- 需要以链表的形式来存储awaiter的值。
- 需要存储正在等待的协程的coroutine handle,因此这个事件可以当他切换到‘set’状态时可以恢复协程。
- 需要可以应用Awaiter的接口,因此其需要三个特别的方法:
await_ready
,await_suspend
,await_resume
。
一旦我们将这些全都放到一起,基本的接口如下所示:
struct async_manual_reset_event::awaiter
{awaiter(const async_manual_reset_event& event) noexcept: m_event(event){}bool await_ready() const noexcept;bool await_suspend(std::experimental::coroutine_handle<> awaitingCoroutine) noexcept;void await_resume() noexcept {}private:const async_manual_reset_event& m_event;std::experimental::coroutine_handle<> m_awaitingCoroutine;awaiter* m_next;
};
由于协程是否暂停取决于event的状态是否为set,所以还需要进行判断:
bool async_manual_reset_event::awaiter::await_ready() const noexcept
{return m_event.is_set();
}
await_suspend
await_suspend才是awaitable类型中最重要的部分,这一节主要讲述其用法和所需要注意的部分。
- 在
m_awaitingCoroutine
中暂存正在等待的协程的协程句柄,以供恢复操作。
bool async_manual_reset_event::awaiter::await_suspend(std::experimental::coroutine_handle<> awaitingCoroutine) noexcept
{// Special m_state value that indicates the event is in the 'set' state.const void* const setState = &m_event;// Remember the handle of the awaiting coroutine.m_awaitingCoroutine = awaitingCoroutine;// Try to atomically push this awaiter onto the front of the list.void* oldValue = m_event.m_state.load(std::memory_order_acquire);do{// Resume immediately if already in 'set' state.if (oldValue == setState) return false; // Update linked list to point at current head.m_next = static_cast<awaiter*>(oldValue);// Finally, try to swap the old list head, inserting this awaiter// as the new list head.} while (!m_event.m_state.compare_exchange_weak(oldValue,this,std::memory_order_release,std::memory_order_acquire));// Successfully enqueued. Remain suspended.return true;
}
注意,我们需要在加载旧状态的时候去获取内存顺序,
event类的剩余部分
既然我们已经定义完了awaiter类型,让我们重新看一看async_manual_reset_event
的应用。
- 构造。需要进行初始化,‘not set’(nullptr)或者‘set’(this)
async_manual_reset_event::async_manual_reset_event(bool initiallySet) noexcept
: m_state(initiallySet ? this : nullptr)
{}
is_set()
,判断是否为‘set’
bool async_manual_reset_event::is_set() const noexcept
{return m_state.load(std::memory_order_acquire) == this;
}
reset()
, 如果是‘set’,则切换到‘not set’状态,反之则置之不理
void async_manual_reset_event::reset() noexcept
{void* oldValue = this;m_state.compare_exchange_strong(oldValue, nullptr, std::memory_order_acquire);
}
set()
希望通过交换当前状态和特殊的“set”值this来转换到“set”状态,然后检查旧值是什么。如果有任何等待的协程,那么我们希望在返回之前依次恢复每个协程。
void async_manual_reset_event::set() noexcept
{// Needs to be 'release' so that subsequent 'co_await' has// visibility of our prior writes.// Needs to be 'acquire' so that we have visibility of prior// writes by awaiting coroutines.void* oldValue = m_state.exchange(this, std::memory_order_acq_rel);if (oldValue != this){// Wasn't already in 'set' state.// Treat old value as head of a linked-list of waiters// which we have now acquired and need to resume.auto* waiters = static_cast<awaiter*>(oldValue);while (waiters != nullptr){// Read m_next before resuming the coroutine as resuming// the coroutine will likely destroy the awaiter object.auto* next = waiters->m_next;waiters->m_awaitingCoroutine.resume();waiters = next;}}
}
- 最后一步,应用
co_await()
操作。只需要构造一个awaiter
对象。
async_manual_reset_event::awaiter
async_manual_reset_event::operator co_await() const noexcept
{return awaiter{ *this };
}
综上,我们现在获得了一个可等待的异步手动重置事件并且是lock-free,memory-allocation-free的应用。
这文章有点长,感觉理解还有所欠缺,回头会继续精进的,感谢阅读~