在 Celery 中,使用 apply_async
方法调度的任务可以通过任务 ID 来取消。以下是如何取消 apply_async
调度的任务的详细步骤:
1. 获取任务 ID
当你使用 apply_async
方法调度任务时,它会返回一个 AsyncResult
对象,该对象包含任务的唯一 ID。
示例代码
from celery import Celeryapp = Celery('tasks', broker='redis://localhost:6379/0')@app.task
def add(x, y):return x + y# 调度任务并获取任务 ID
result = add.apply_async((10, 20))
task_id = result.id
print(f"Task ID: {task_id}")
2. 使用 revoke
方法取消任务
你可以使用 revoke
方法来取消任务。revoke
方法接受任务 ID 作为参数,并可以选择是否终止正在运行的任务。
示例代码
from celery import Celeryapp = Celery('tasks', broker='redis://localhost:6379/0')# 假设 task_id 是你之前获取的任务 ID
task_id = 'your_task_id'# 取消任务
app.control.revoke(task_id, terminate=True)
参数说明
terminate=True
:如果任务正在运行,终止该任务。默认值为False
。signal='SIGTERM'
:指定终止任务时发送的信号,默认为SIGTERM
。
完整示例
以下是一个完整的示例,展示了如何调度任务并取消任务:
from celery import Celery
import timeapp = Celery('tasks', broker='redis://localhost:6379/0')@app.task
def add(x, y):print(f"Starting task with args: {x}, {y}")time.sleep(10) # 模拟长时间运行的任务return x + y# 调度任务并获取任务 ID
result = add.apply_async((10, 20))
task_id = result.id
print(f"Task ID: {task_id}")# 假设你想在任务开始后不久取消它
time.sleep(2) # 等待一段时间,确保任务已经开始
app.control.revoke(task_id, terminate=True)
print(f"Task with ID {task_id} has been revoked")
注意事项
- 任务状态:取消任务时,任务的状态会变为
REVOKED
。你可以通过AsyncResult
对象检查任务的状态。 - 终止任务:如果任务已经在执行,设置
terminate=True
可以立即终止任务。否则,任务将继续执行直到完成。 - 信号:默认情况下,
revoke
方法发送SIGTERM
信号来终止任务。你可以根据需要选择其他信号,例如SIGKILL
。
检查任务状态
你可以使用 AsyncResult
对象来检查任务的状态:
result = app.AsyncResult(task_id)
if result.state == 'REVOKED':print(f"Task with ID {task_id} has been revoked")
else:print(f"Task with ID {task_id} is in state: {result.state}")
总结
通过上述步骤,你可以轻松地取消使用 apply_async
方法调度的任务。希望这些方法能帮助你成功取消任务!如果有更多问题或需要进一步的帮助,请随时告诉我。