参考自仓库https://github.com/SparksFly8/Learning_Python/tree/master/coroutine
协程(coroutine)在多任务协作中体现的效率又极为的突出。Python中执行多任务还可以通过多进程
或一个进程中的多线程
来执行,但两者之中均存在一些缺点,因此引出了协程。
首先需要了解同步和异步的概念:
- 同步:发出一个“调用”时,在没有得到结果之前,该“调用”就不返回,“调用者”需要一直等待该“调用”结束才能进行下一步工作。
- 异步:“调用”在发出之后,就直接返回了,也就没有返回结果。“被调用者”完成任务后,通过状态来通知“调用者”继续回来处理该“调用”。
普通同步代码实现多个IO任务的示例:
# 普通同步代码实现多个IO任务
import time
def taskIO_1():print('开始运行IO任务1...')time.sleep(2) # 假设该任务耗时2sprint('IO任务1已完成,耗时2s')
def taskIO_2():print('开始运行IO任务2...')time.sleep(3) # 假设该任务耗时3sprint('IO任务2已完成,耗时3s')start = time.time()
taskIO_1()
taskIO_2()
print('所有IO任务总耗时%.5f秒' % float(time.time()-start))
开始运行IO任务1...
IO任务1已完成,耗时2s
开始运行IO任务2...
IO任务2已完成,耗时3s
所有IO任务总耗时5.00112秒
在计算机中CPU的运算速率要远远大于IO速率,而当CPU运算完毕后,如果再要闲置很长时间去等待IO任务完成才能进行下一个任务的计算,这样的任务执行效率很低。
所以需要有一种异步的方式来处理类似任务
一、使用yield from和@asyncio.coroutine实现协程
yield 和 yield from
yield
在生成器中有中断的功能,可以传出值,也可以从函数外部接收值,而yield from
的实现就是简化了yield
操作,如下:
def generator_1(titles):yield titles
def generator_2(titles):yield from titlestitles = ['Python','Java','C++']
for title in generator_1(titles):print('生成器1:',title)
for title in generator_2(titles):print('生成器2:',title)
执行结果如下:
生成器1: ['Python', 'Java', 'C++']
生成器2: Python
生成器2: Java
生成器2: C++
可以看出,yield titles
返回titles
完整列表,而yield from titles
实际等价于:
for title in titles: # 等价于yield from titlesyield title
yield from
还有一个主要的功能是省去了很多异常的处理,不再需要我们手动编写,其内部已经实现大部分异常处理。
下面是用生成器来实现一个整数加和的程序,通过send()
函数向生成器中传入要加和的数字,然后最后以返回None
结束,total
保存最后加和的总数。
如果使用生成器g1
完成这个任务,在最后传入None
后的程序退出会报StopIteration
异常并返回total
值,如:
加 2
加 3
加 None
StopIteration: 5
但是如果用g2
完成这个任务,即使传入None
也不报异常,total = tield from generator_1()
返回给total
的值是generator_1()
最终的return total
。这说明yield from
封装了处理常见异常的代码。
- 概念:
- 子生成器:yield from后的generator_1()生成器函数是子生成器
- 委托生成器:generator_2()是程序中的委托生成器,它负责委托子生成器完成具体任务。
- 调用方:main()是程序中的调用方,负责调用委托生成器。
yield from
在其中的关键作用是:建立调用方和子生成器的通道
- 在上述代码中main()每一次在调用send(value)时,value不是传递给了委托生成器generator_2(),而是借助yield from传递给了子生成器generator_1()中的yield
- 同理,子生成器中的数据也是通过yield直接发送到调用方main()中。
yield from
结合@asyncio.coroutine
实现协程(已移除)
在Python3.5及之后的版本中,asyncio.coroutine装饰器被弃用和移除了,取而代之的是async def来定义协程,这里仅做了解
# 使用同步方式编写异步功能
import time
import asyncio
@asyncio.coroutine # 标志协程的装饰器
def taskIO_1():print('开始运行IO任务1...')yield from asyncio.sleep(2) # 假设该任务耗时2sprint('IO任务1已完成,耗时2s')return taskIO_1.__name__
@asyncio.coroutine # 标志协程的装饰器
def taskIO_2():print('开始运行IO任务2...')yield from asyncio.sleep(3) # 假设该任务耗时3sprint('IO任务2已完成,耗时3s')return taskIO_2.__name__
@asyncio.coroutine # 标志协程的装饰器
def main(): # 调用方tasks = [taskIO_1(), taskIO_2()] # 把所有任务添加到task中done,pending = yield from asyncio.wait(tasks) # 子生成器for r in done: # done和pending都是一个任务,所以返回结果需要逐个调用result()print('协程无序返回值:'+r.result())if __name__ == '__main__':start = time.time()loop = asyncio.get_event_loop() # 创建一个事件循环对象looptry:loop.run_until_complete(main()) # 完成事件循环,直到最后一个任务结束finally:loop.close() # 结束事件循环print('所有IO任务总耗时%.5f秒' % float(time.time()-start))
开始运行IO任务1...
开始运行IO任务2...
IO任务1已完成,耗时2s
IO任务2已完成,耗时3s
协程无序返回值:taskIO_2
协程无序返回值:taskIO_1
所有IO任务总耗时3.00209秒
二、使用async
和await
实现协程
在Python3.5开始引入新的语法async
和await
,以简化并更好地标识异步IO
import time
import asyncioasync def taskIO_1():print('开始运行IO任务1...')await asyncio.sleep(2) # 假设该任务耗时2sprint('IO任务1已完成,耗时2s')return taskIO_1.__name__async def taskIO_2():print('开始运行IO任务2...')await asyncio.sleep(3) # 假设该任务耗时3sprint('IO任务2已完成,耗时3s')return taskIO_2.__name__async def main(): # 调用方# 将协程对象转换为 Task 对象tasks = [asyncio.create_task(taskIO_1()), asyncio.create_task(taskIO_2())]# 使用 asyncio.gather 代替 asyncio.wait,以简化代码results = await asyncio.gather(*tasks) # 等待所有任务完成并获取返回值for result in results:print('协程返回值:' + result)if __name__ == '__main__':start = time.time()# 对于 Python 3.7 及以上版本,推荐使用 asyncio.run 来运行主函数asyncio.run(main())# 由于 asyncio.run 已经处理了事件循环的创建和关闭,以下代码可以删除# loop = asyncio.get_event_loop()# try:# loop.run_until_complete(main())# finally:# loop.close()print('所有IO任务总耗时%.5f秒' % (time.time() - start))
开始运行IO任务1...
开始运行IO任务2...
IO任务1已完成,耗时2s
IO任务2已完成,耗时3s
协程返回值:taskIO_1
协程返回值:taskIO_2
所有IO任务总耗时3.01749秒
协程是函数级别的程序,多进程和多线程是内核级别的程序。
三、使用asyncio的不同方法实现协程
在多个协程中的线性控制流很容易通过内置的关键词await
来管理。使用asyncio
模块中的方法可以实现更多复杂的结构,它可以并发地完成多个协程。
asyncio.wait()
你可以将一个操作分成多个部分并分开执行,而wait(tasks)
可以被用于中断任务集合(tasks)中的某个被事件循环轮询到的任务,直到该协程的其他后台操作完成才被唤醒。
done, pending = await asyncio.wait(aws)
此处并发运行传入的aws
(awaitable objects),同时通过await
返回一个包含(done, pending)的元组,done表示已完成的任务列表,pending表示未完成的任务列表。
注:
- 只有当给
wait()
传入timeout
参数时才有可能产生pending
列表。 - 通过
wait()
返回的结果集是按照事件循环中的任务完成顺序排列的,所以其往往和原始任务顺序不同。
async def main(): # 调用方tasks = [taskIO_1(), taskIO_2()] # 把所有任务添加到task中done,pending = await asyncio.wait(tasks) # 子生成器for r in done: # done和pending都是一个任务,所以返回结果需要逐个调用result()print('协程无序返回值:'+r.result())
asyncio.gather()
如果你只关心协程并发运行后的结果集合,可以使用gather(),它不仅通过await返回仅一个结果集,而且这个结果集的结果顺序是传入任务的原始顺序。
gather()
通过await
直接返回一个结果集列表,最后返回的结果集的顺序是按照初始传入的任务顺序排的。
async def main(): # 调用方resualts = await asyncio.gather(taskIO_1(), taskIO_2()) # 子生成器print(resualts)
asyncio.as_completed()
as_completed(tasks)
是一个生成器,它管理着一个协程列表(此处是传入的tasks)的运行。当任务集合中的某个任务率先执行完毕时,会率先通过await
关键字返回该任务结果。可见其返回结果的顺序和wait()
一样,均是按照完成任务顺序排列的。
使用as_completed(tasks)
和wait(tasks)
相同之处是返回结果的顺序是协程的完成顺序,这与gather()
恰好相反。而不同之处是as_completed(tasks)
可以实时返回当前完成的结果,而wait(tasks)
需要等待所有协程结束后返回的done
去获得结果。
async def main(): # 调用方tasks = [taskIO_1(), taskIO_2()] # 把所有任务添加到task中for completed_task in asyncio.as_completed(tasks):resualt = await completed_task # 子生成器print('协程无序返回值:'+resualt)