一、问题
假如我在利用协程并发执行任务的时候,会出现有些任务特别耗时,从而导致程序运行卡住,我们想跳过这些执行特别耗时的任务,只返回不超时的任务结果该怎么解决?
二、实现过程
2.1 情景
假如我有四个任务需要并发执行:
import asyncioasync def process_task1(task):try:# 模拟一个耗时的协程任务await asyncio.sleep(8)print(f"Task {task} completed")except:# 任务超时,跳过当前任务print(f"Task {task} timed out, skipping...")finally:return 1async def process_task2(task):try:# 模拟一个耗时的协程任务await asyncio.sleep(6)print(f"Task {task} completed")except:# 任务超时,跳过当前任务print(f"Task {task} timed out, skipping...")finally:return 2async def process_task3(task):try:# 模拟一个耗时的协程任务await asyncio.sleep(2)print(f"Task {task} completed")except:# 任务超时,跳过当前任务print(f"Task {task} timed out, skipping...")finally:return 3async def process_task4(task):try:# 模拟一个耗时的协程任务await asyncio.sleep(1)print(f"Task {task} completed")except:# 任务超时,跳过当前任务print(f"Task {task} timed out, skipping...")finally:return 4
第一个任务执行需要8秒,第二个任务执行需要6秒,第三个任务执行需要2秒,第四个任务执行需要1秒。
2.2 处理过程
假如我们用asyncio.gather并发执行四个任务,会等四个任务全部执行完成才会返回结果,这将非常耗时,如果有一个任务执行时间是几千秒,程序会卡在那里不动。
async def main():# 创建任务列表tasks = [process_task1(1), process_task2(2), process_task3(3), process_task4(4)]tasks = [asyncio.ensure_future(task) for task in tasks]res = await asyncio.gather(*tasks)print(res)# 运行主程序
asyncio.run(main())
这时候,我们可以使用asyncio.wait() 来处理并发任务。asyncio.wait() 是 asyncio 库中的一个函数,用于等待一组协程任务完成。它返回两个集合:已完成的任务集合和未完成的任务集合。并发执行4个任务,通过设置超时时间,对于超过4秒的任务就不会完成,未超过4秒的任务会完成。
async def main():# 创建任务列表tasks = [process_task1(1), process_task2(2), process_task3(3), process_task4(4)]tasks = [asyncio.ensure_future(task) for task in tasks]# 设置超时时间为4秒timeout = 4# 并发执行任务,并设置超时时间done, pending = await asyncio.wait(tasks, timeout=timeout)completed = []# 处理已完成的任务、未超时的任务for task in done:print("Completed task:", task)result = task.result()print(result)completed.append(result)print(completed)# 处理未完成的任务、超时的任务,直接取消任务for task in pending:print("Pending task:", task)task.cancel() # 会抛出异常,执行这个任务的except代码,打印出Task 2 timed out, skipping...Task 1 timed out, skipping...# 或者处理未完成的任务:不考虑时间,继续等待他们全部完成not_completed = await asyncio.gather(*pending)print(not_completed)# 整合结果res = completed + not_completedprint(res)
# 运行主程序
asyncio.run(main())
对于已完成的任务集合,使用 for 循环遍历每个任务,并对其进行相应的处理。例如,你可以获取任务的结果、处理返回的数据或执行其他操作。
对于未完成的任务集合,可以选择等待它们完成,或者取消这些任务。如果你希望等待这些任务完成,可以继续使用 await asyncio.wait() 或其他等待任务完成的方法如 asyncio.gather() 函数。如果你希望取消这些任务,可以使用 task.cancel() 方法取消任务的执行。请注意,取消任务只能在任务未开始执行或者支持取消的情况下生效。
三、结果
运行结果:
作者简介:
读研期间发表6篇SCI数据挖掘相关论文,现在某研究院从事数据算法相关科研工作,结合自身科研实践经历不定期分享关于Python、机器学习、深度学习、人工智能系列基础知识与应用案例。致力于只做原创,以最简单的方式理解和学习,关注我一起交流成长。需要数据集和源码的小伙伴可以关注底部公众号添加作者微信。