文章目录
- 取消任务和设置超时
- 取消任务
- 设置超时
- future
- 使用装饰器测量协程执行时间
- 创建和操作事件循环
取消任务和设置超时
取消任务
要取消一个协程任务,你可以调用该任务的cancel()
方法。这个操作会使得正在等待的asyncio.sleep()
抛出asyncio.CancelledError
异常。
import asyncioasync def my_coroutine():print("开始执行")try:await asyncio.sleep(5)except asyncio.CancelledError:print("任务被取消")finally:print("清理工作")async def main():task = asyncio.create_task(my_coroutine())await asyncio.sleep(2) # 等待2秒task.cancel() # 取消任务try:await taskexcept asyncio.CancelledError:print("任务已被取消")asyncio.run(main())
设置超时
要为协程设置超时,你可以使用asyncio.wait_for()
函数。如果任务在指定的时间内没有完成,它将抛出asyncio.TimeoutError
异常。
import asyncioasync def my_coroutine():print("开始执行")await asyncio.sleep(5)print("执行完成")async def main():try:await asyncio.wait_for(my_coroutine(), timeout=3)except asyncio.TimeoutError:print("任务超时")asyncio.run(main())
如果任务超时了,提示超时,但是不终止这个任务
#asyncio.shield的作用是保护一个协程免受取消操作的影响
import asyncioasync def my_coroutine():print("开始执行")await asyncio.sleep(5)print("执行完成")async def main():task = asyncio.create_task(my_coroutine())# 使用asyncio.wait_for来检测超时,但不取消任务try:await asyncio.wait_for(asyncio.shield(task), timeout=3)except asyncio.TimeoutError:print("任务超时,但仍在运行")# 等待任务真正完成await taskasyncio.run(main())
另外的方法
import asyncioasync def my_coroutine():print("开始执行")await asyncio.sleep(5)print("执行完成")async def main():# 创建任务,但不要等待它task = asyncio.ensure_future(my_coroutine())# 创建一个超时检测的协程async def timeout_detector(task):await asyncio.sleep(3)if not task.done():print("任务超时,但仍在运行")# 运行超时检测器asyncio.create_task(timeout_detector(task))# 等待任务完成await taskasyncio.run(main())
future
在Python中,Future
对象通常与并发编程相关,特别是在异步编程中。Future
代表了一个异步计算的结果,这个结果可能在将来的某个时间点可用。在Python的 asyncio
库中,Future
是一个核心概念,用于表示一个尚未完成的计算。
在 asyncio
中,通常会使用 asyncio.create_task()
函数或 asyncio.ensure_future()
函数来创建一个 Future
对象。
然而,从Python 3.7开始,asyncio.create_task()
成为了推荐的方式,因为它返回的是一个 Task
对象,这是 Future
的一个子类,提供了更多的功能。
下面是一个使用 asyncio
和 Future
(通过 create_task()
)的简单示例:
import asyncioasync def fetch_data_from_web(url):# 模拟从网络获取数据的异步操作print(f"Starting fetch of {url}")await asyncio.sleep(1) # 假设网络请求需要1秒return f"Data from {url}"async def main():# 创建一个任务(即 Future)task = asyncio.create_task(fetch_data_from_web("https://example.com"))# 在这里,我们可以做其他事情,而不需要等待fetch_data_from_web完成print("Doing other tasks...")await asyncio.sleep(0.5) # 模拟其他任务需要0.5秒# 如果需要,我们可以等待任务完成并获取结果result = await taskprint(result)# 运行事件循环
asyncio.run(main())
在这个示例中,fetch_data_from_web
是一个模拟从网络获取数据的异步函数。在 main
函数中,我们使用 asyncio.create_task()
创建一个任务(即 Future
),该任务将在后台执行 fetch_data_from_web
函数。然后,main
函数可以继续执行其他任务(在这个例子中,只是简单地打印一条消息并等待0.5秒)。当我们需要 fetch_data_from_web
的结果时,我们使用 await
关键字等待任务完成并获取结果。
注意:在Python中,通常使用 await
关键字与 Future
或 Task
对象交互,而不是直接操作它们。这是因为 Future
和 Task
是异步编程模型中的底层概念,而 await
提供了更高级、更直观的接口来与它们交互。
使用装饰器测量协程执行时间
在Python中,你可以使用装饰器来测量协程的执行时间。由于协程是异步执行的,你不能直接使用标准的time.time()
函数来测量其执行时间,因为这会阻塞事件循环。相反,你需要使用异步上下文管理器(async with
)和异步装饰器(async def
)来包装协程,并在进入和退出上下文时记录时间。
以下是一个使用装饰器测量协程执行时间的示例:
import asyncio
import time
from typing import Callable, Awaitable
from contextlib import asynccontextmanager @asynccontextmanager
async def measure_time(label: str): start = time.time() try: yield finally: end = time.time() print(f"{label} executed in {end - start:.2f} seconds") def measure_coroutine(func: Callable[..., Awaitable[None]]) -> Callable[..., Awaitable[None]]: async def wrapper(*args, **kwargs): async with measure_time(f"Coroutine {func.__name__}"): await func(*args, **kwargs) return wrapper # 示例协程
async def example_coroutine(): print("Starting example coroutine...") await asyncio.sleep(2) # 模拟耗时操作 print("Finished example coroutine...") # 应用装饰器
example_coroutine = measure_coroutine(example_coroutine) # 运行协程
asyncio.run(example_coroutine())
然而,上面的示例虽然使用了装饰器,但实际上并没有真正地将measure_time
作为装饰器直接应用于协程函数,因为装饰器本身不能是异步的。上面的代码实际上定义了一个异步上下文管理器measure_time
,并使用了一个额外的包装函数wrapper
来将measure_time
上下文管理器应用到协程上。
如果你想要一个更“装饰器”风格的解决方案,并且不介意稍微改变一下用法,可以这样做:
import asyncio
import time
from typing import Callable, Awaitable def measure_coroutine(func: Callable[..., Awaitable[None]]) -> Callable[..., Awaitable[None]]: async def wrapper(*args, **kwargs): start = time.time() await func(*args, **kwargs) end = time.time() print(f"Coroutine {func.__name__} executed in {end - start:.2f} seconds") return wrapper # 示例协程
@measure_coroutine
async def example_coroutine(): print("Starting example coroutine...") await asyncio.sleep(2) # 模拟耗时操作 print("Finished example coroutine...") # 运行协程
asyncio.run(example_coroutine())
在这个版本中,装饰器measure_coroutine
直接返回一个新的协程函数wrapper
,它测量了原始协程的执行时间并打印出来。这样可以直接在协程函数上使用装饰器,而不需要额外的包装或调用。
创建和操作事件循环
在Python中,使用asyncio
库实现协程时,可以手动创建和访问事件循环。以下是一个简单的代码示例,展示了如何手动创建和操作事件循环:
import asyncio
async def my_coroutine():print("协程开始执行")await asyncio.sleep(1)print("协程执行完毕")
def main():# 手动创建事件循环loop = asyncio.get_event_loop()# 创建协程任务coroutine = my_coroutine()task = loop.create_task(coroutine)# 启动事件循环try:loop.run_until_complete(task)finally:# 关闭事件循环loop.close()
if __name__ == "__main__":main()
在这个示例中,我们首先导入了asyncio
库,然后定义了一个协程my_coroutine
。在main
函数中,我们手动创建了一个事件循环,将协程封装为一个任务,然后启动事件循环。当任务完成时,事件循环会关闭。
需要关注的问题:
- 事件循环的创建和关闭:确保在程序开始时创建事件循环,并在结束时关闭它。在上述代码中,我们使用
asyncio.get_event_loop()
来获取当前事件循环,或者创建一个新的。使用loop.close()
来关闭事件循环。 - 事件循环的唯一性:通常情况下,一个程序中只会有一个活动的事件循环。在多线程应用程序中,每个线程都有自己的事件循环。
- 异步编程模型:使用
async
和await
关键字来定义协程,并使用asyncio
库提供的函数来操作它们。这有助于保持代码的异步性和非阻塞性。 - 任务的取消和异常处理:在异步编程中,任务可能会被取消,也可能会抛出异常。确保使用
try
和except
块来处理这些情况,以保持程序的稳定性。 - 资源泄漏:在协程中使用的资源(如文件、网络连接等)应该在协程完成后正确释放,以避免资源泄漏。
- 并发和并行:
asyncio
库提供了并发执行的机制,但它通常是单线程的。如果需要真正的并行执行,可以考虑使用concurrent.futures
模块或asyncio
的run_in_executor
方法。 - 与线程和进程的交互:在异步编程中,可能会需要与线程或进程交互。
asyncio
库提供了asyncio.run_in_executor
方法来在另一个线程或进程中运行阻塞代码。 - 性能和调试:异步编程模型可能会带来性能优势,但也可能使调试变得更加复杂。使用适当的工具和日志记录来帮助调试异步代码。
通过关注这些问题,你可以更有效地使用Python中的asyncio
库来编写和维护异步代码。
备注:
在Python 3.10及更高版本中,asyncio.get_event_loop()
已被弃用,因为它可能会导致线程安全问题。如果你的代码中出现了DeprecationWarning: There is no current event loop
警告,这意味着你在尝试获取事件循环时,当前线程中没有设置的事件循环。
为了解决这个问题,你应该使用asyncio.run()
函数来运行你的异步程序,它会自动创建和设置一个新的事件循环,并在结束时关闭它。这是Python 3.7引入的新方法,用于简化异步程序的启动和运行。