背景
很早之前有一篇中断地关于协程的文章,彼时从js的事件循环切入,对比学习python的asyncio,感觉是个很大的知识点,想写的内容很多,遂搁浅。本次,抽空补齐这块的内容。
序
协程是什么?
- 协程是个非常好的概念,它在用户态实现子例程之间的切换即
并发
执行,相比于多线程, 它是单线程并发,节省了上下文切换的开销。 因为GIL的存在所以对于从事python开发的人员来说,协程一定要掌握。 - 协程没什么了不起的,大白话来讲就是-
执行一段时间A函数然后由用户自主切换到B函数...类比线程的上下文切换
- 一个重点: 只有
异步IO+协程
才有意义,不然协程只是拥有了切换子例程的功能,但是io操作或者耗时操作还是会阻塞, 此时协程解决不了单线程并发的问题
为什么需要协程?
- 直接给出结论:
用同步编程的思想实现异步编程 - 让原来要使用异步+回调方式写的非人类代码,可以用看似同步的方式写出来...
- 为什么这么说?我们来看看没有协程是怎么处理I/O模型的
- 同步编程:应用程序等待IO结果,阻塞当前线程;
优点:符合常规思维,易于理解,逻辑简单;
缺点:成本高昂,效率太低,其他与IO无关的业务也要等待IO的响应; - 异步多线程/进程:将IO操作频繁的逻辑、或者单纯的IO操作独立到一/多个线程中,业务线程与IO线程间靠通信/全局变量来共享数据;
优点:充分利用CPU资源,防止阻塞资源
缺点:线程切换代价相对较高,异步逻辑代码复杂 - 异步消息+回调函数:设计一个消息循环处理器,接收外部消息(包括系统通知和网络报文等),收到消息时调用注册的回调函数;
优点:充分利用CPU资
缺点:代码逻辑复杂
- 同步编程:应用程序等待IO结果,阻塞当前线程;
- 可以看到在异步编程中如果没有协程, 那就需要回调机制, 如在
I/O多路复用
中, 需要在事件中注册回调函数,而这个回调函数里面就是对于事件的处理逻辑, 相当于一个完整的业务逻辑分为两半 前一半是回调之前,后一半是回调函数, 如果回调依赖的话 还会存在回调地狱的问题, 如果使用协程那么整个业务逻辑将跟同步编程一致 业务不分散且避免了回调地狱的问题;
- 为什么这么说?我们来看看没有协程是怎么处理I/O模型的
- 对一些io密集型的进程来说。协程可以优雅地以同步编程地方式实现非阻塞调度
如何实现协程?
- python实现协程有两种机制:
- yield 生成器机制-
无栈协程
- greenlet—
有栈协程
(由C语言写成的库, 本篇文章只涉及 greenlet的使用不涉及原理)。
- yield 生成器机制-
从yield到async
一、yield
1.1 生成器
- 概念
包含yield
的函数,即为一个生成器函数, 返回值是一个生成器对象(<generator object func at 0x0000000001D606D8>
),作为一个特殊的迭代器,生成器
拥有迭代器的一切特征, 比如next
调用(next(generator)
). - code
def _generator():yield 1yield 2yield 3 generator = _generator()
1.2 生成器的三个常用方法
- 1.2.1 generator.send(params)
- 向生成器内部传递一个值,并返回yield的值
- 1.2.2 generator.close()
激活前
调用,直接结束生成器的生命周期激活后
调用向子生成器内部抛出GeneratorExit
异常, 在生成器内部这个异常未被捕获将抛出来,close将忽略这个错误
, 如果该生成器内部捕获并处理了这个异常那如果此时 后面还跟有 yield的代码 将抛出RuntimeError
,这个错误将被传回调用方,def _generator():try:yield 1except GeneratorExit:print(1123)raise # 注释掉此行 将抛出 RuntimeErroryield 2g = _generator() next(g) g.close()
- 1.2.3 generator.throw(Exception, exception_info)
激活前
调用,直接抛出Exception
异常;激活后
调用向生成器内部抛出Exception
异常, 并寻找下一个yield(如果没有找到将抛出StopIteration
异常), 返回值是 yield 的的值(参考send,只是send
是往生成器内部送入一个值,而throw
是往生成器内部送入一个异常)def _generator():try:yield 1except Exption as e:passyield 2yield 3generator = _generator() print(next(generator)) print(generator.throw(Exception))>> 1 >> 2
1.3 StopIteration
- 1.3.1 - 迭代结束的标志,
当生成器(迭代器)
穷尽所有元素后的报错 - 1.3.2 - StopIteration().value:
生成器函数也可以包含return语句 return后面的值将作为 异常对象的value
属性来返回def _generator(): yield 1 return 2gen = _generator() print(next(gen)) try:print(next(gen)) except StopIteration as e:print("返回值:", e.value)>> 1 >> 返回值: 2
- 1.3.3 for对
StopIteration
的特殊处理
平常使用的for xx in xx:
遍历会自动处理StopIteration
异常作为迭代结束的标志
二、yield from
2.1 语法
- 后面直接跟
Iterable
(可迭代对象)。 - 与asyncio.coroutine同时使用,定义协程函数。在python3.5以后改成了await。当yield from后面是IO耗时操作的时候,会切换至另一个yield from。
- 返回值是自生成器的返回值, 即1.3.2的value。
2.2 三个角色
- 子生成器 - sub-generator
- 委托生成器 - delegating generator
包含yield from语句的生成器 - 调用方 - caller
2.3 作用
- 1.简化了 for xx in xx 的迭代逻辑,
- 2.调用方和子生成器之间的
透明
双向通道, yield from 承担了send
、throw
通道的作用, 无论是 send进的数据还是throw进的异常都无感
地传输给了子生成器- caller端调用send的传值将传给子生成器
未使用 yield from
使用 yield fromdef delegating_generator(coro):yield_data = coro.send(None) while True:try:x = yield yield_data yield_data = coro.send(x) except StopIteration:pass
def delegating_generator(coro):yield from coro
- caller端调用throw或者close的异常抛出将直接传递给子生成器
未使用 yield from
使用 yield fromdef delegating_generator(coro):yield_data = coro.send(None)while True:try:try:x = yield yield_dataexcept Exception as e:yield_data = coro.throw(e)else:yield_data = coro.send(x)except StopIteration:pass
可以看到其实yield from 帮我们处理了很多逻辑, 上面未使用 yield from 的情况还是未考虑def delegating_generator(coro):yield from coro
GeneratorExit
异常的情况, 即close
操作。所以省心省力直接yield from
。 - 针对异常传递, 异常是首先被子生成器处理的,只有子生成未捕获或者主动抛出, 委托生成器才有资格处理; 如果生成器没有处理抛出的异常,异常会向上冒泡,传到调用方的上下文中。以下code可以说明这种情况
import sysdef generator():try:yield 1except TypeError as e:print("异常", e, sys.exc_info())except IOError as e:print("异常", e, sys.exc_info())raisedef delegating _generator():try:yield from generator()except GeneratorExit:print("delegating _generator异常", sys.exc_info())def main():gen = delegating _generator()print("step_0:", next(gen))# print("step_1:", gen.throw(ValueError))# print("step_1:", gen.throw(GeneratorExit))print("step_1_1:", gen.close())if __name__ == '__main__':main()
- caller端调用send的传值将传给子生成器
以上 yield
和yield from
的基本机制已经说明清楚了,python通过生成器实现的协程就是通过yield暂停程序流执行, send恢复程序流执行, 在用户态实现了上下文切换, 也就是微线程的并行
;
以下开始,将做些封装逐步演进asyncio是如何工作的
三、从生成器到协程
3.1 Future对象 (类比js的Promise对象)
如果有多线程编程经验也可知道python多线程这边concurrent
库也定义了Future对象
,此处我们很容易知道Future没什么特别的,只是代表一个未来即将完成的任务
class Future(object): def __init__(self): self.result = None self._callbacks = [] def add_done_callback(self, fn): self._callbacks.append(fn) def set_result(self, result=None): self.result = result for fn in self._callbacks: fn(self)def __iter__(self):yield selfreturn self.result
3.2 Task对象
管理协程的最小单元
class Task(object): def __init__(self, the_coroutine=None): self.t_coroutine = the_coroutine the_future = Future() the_future.set_result(None) self.step(the_future) def step(self, the_future=None): try: # send 会进入到 coroutine 执行,即 fetch,直到下次 yield # next_future 为 yield 返回的对象 next_future = self.coroutine.send(the_future.result) except StopIteration: return next_future.add_done_callback(self.step)
3.3 执行体
实际业务逻辑(包含io操作)的执行方法, 以读取文件为例,其余io比如网络socket通信同理
def coroutine():f = Future() # 仅包装一个未来执行的对象...yield from f
3.4 EventLoop事件循环
事件循环这块比较简单,就是做了IO多路复用
的系统调用,win下是 select unix下则一般使用epoll
class Loop(): @classmethoddef get_event_loop(cls):return selfdef run_until_complete(self)while not stopped: events = selector.select() for event_key, event_mask in events: callback = event_key.data callback(event_key, event_mask)
调度(伪代码)
loop = Loop.get_event_loop()tasks = Task(coroutine)
loop.run_until_complete()
有点asyncio的影子了;到此处可能会有一些疑惑,为什么协程可以在io密集型程序中实现并发操作呢?其实如果仔细看会发现所有的耗时操作是交给DMA(Direct Memory Access,直接内存访问)来执行的,而cpu耗时大多数只花费在了loop这个事件循环处, 此事件循环使用了select
或者epoll
非阻塞监听io变化
四、asyncio.coroutine (可以忽略,属于过去式了)
当使用该语法注释协程,并且运行时将出现以下提示(python >= 3.8
)
: DeprecationWarning: “@coroutine” decorator is deprecated since Python 3.8, use “async def” instead
async def f():
五、async/await
当前我们在定义协程时已经很方便了
一个demo
import asyncioasync def c(*args, **kwargs):await asyncio.sleep(1)
7. 简约总结篇
- 协程可以身处四个状态中的一个。当前状态可以使用
inspect.getgeneratorstate(…)
函数确定,该函数会返回下述字符串中的一个:- 1.GEN_CREATED:等待开始执行
- 2.GEN_RUNNING:解释器正在执行
- 3.GEN_SUSPENED:在yield表达式处暂停
- 4.GEN_CLOSED:执行结束
- 通过asyncio源码分析我们可以看到, asyncio的调度其实就是
TimerHandle(asyncio.sleep)
+I/O多路复用器
;
参考文献
python协程的四种实现方式
asyncio实现
DOI
协程好文
actor模式
asyncio
DOI
async
https://zhuanlan.zhihu.com/p/446999661
https://jishuin.proginn.com/p/763bfbd7159b
asyncio调度
小明asyncio
*进阶异步编程