大家好!我是爱摸鱼的小鸿,关注我,收看每期的编程干货。
一个简单的库,也许能够开启我们的智慧之门,
一个普通的方法,也许能在危急时刻挽救我们于水深火热,
一个新颖的思维方式,也许能激发我们无尽的创造力,
一个独特的技巧,也许能成为我们的隐形盾牌……
神奇的 Python 库之旅,第 9 章
目录
- 一、什么是 Celery?
- 二、为什么选择 Celery?
- 三、Celery 编程示例
- 四、总结
- 五、作者Info
一、什么是 Celery?
Celery 是一个强大的工具,它能够帮助我们管理和调度复杂的任务。无论是处理异步任务、计划任务,还是分布式任务,Celery 都能轻松胜任。在这篇文章中,我们将深入探讨 Celery 的魅力,通过多个代码示例,带你全面了解这个神器。
Celery 是一个简单、灵活且可靠的分布式系统,用于处理大量消息,并提供了维护这些任务的工具。它的使用场景非常广泛,比如电子邮件发送、视频处理、数据分析、Web 爬虫等。
Github 项目地址:
https://github.com/celery/celery
…
二、为什么选择 Celery?
选择 Celery 的理由有很多,这里列出几个主要的优势:
- 异步任务处理:能让你在不阻塞主进程的情况下处理任务;
- 定时任务:支持类似 cron 的定时任务调度;
- 分布式执行:支持将任务分发到多个机器上执行,提升系统性能;
- 高可用性:可以与消息队列(如 RabbitMQ, Redis)结合,实现高可用性和可靠性。
在开始之前,我们需要安装 Celery 和一个消息代理(这里我们选择 Redis):
pip install celery redis
…
三、Celery 编程示例
快速开始:一个简单的任务
让我们从一个简单的例子开始,创建一个任务来演示 Celery 的基本用法。
# tasks.py
from celery import Celery# 创建 Celery 实例
app = Celery('tasks', broker='redis://localhost:6379/0')@app.task
def add(x, y):return x + y
在这个例子中,我们定义了一个名为 add 的任务,它接收两个参数并返回它们的和。接下来,我们可以启动一个 Celery worker 来处理这个任务:
celery -A tasks worker --loglevel=info
启动 Celery worker 后,我们可以在 Python 交互式环境中调用这个任务:
>>> from tasks import add
>>> result = add.delay(4, 6)
>>> result.get(timeout=10)
10
深入挖掘:任务链和组
Celery 的强大之处在于它能够处理复杂的工作流,比如任务链和任务组。
任务链
任务链允许我们将多个任务串联起来,前一个任务的输出作为下一个任务的输入:
from celery import chain# 定义任务
@app.task
def multiply(x, y):return x * y@app.task
def add_and_multiply(x, y, z):return chain(add.s(x, y), multiply.s(z))()# 调用任务链
result = add_and_multiply(2, 3, 4)
print(result.get()) # 输出 20,因为 (2 + 3) * 4 = 20
任务组
任务组允许我们并行执行一组任务,并在所有任务完成后获得结果:
from celery import group# 定义任务组
@app.task
def sum_list(numbers):return sum(numbers)@app.task
def process_groups():return group(sum_list.s([1, 2, 3]), sum_list.s([4, 5, 6]), sum_list.s([7, 8, 9]))().get()# 调用任务组
result = process_groups()
print(result) # 输出 [6, 15, 24]
…
定时任务
Celery 还支持定时任务,这类似于 Unix 系统的 cron 作业:
from celery import Celery
from celery.schedules import crontabapp = Celery('periodic_tasks', broker='redis://localhost:6379/0')@app.task
def scheduled_task():print("This task runs every 10 seconds")app.conf.beat_schedule = {'run-every-10-seconds': {'task': 'periodic_tasks.scheduled_task','schedule': 10.0,},
}# 启动 Celery beat 进程
celery -A periodic_tasks beat --loglevel=info
…
错误处理与重试机制
在实际应用中,任务失败是不可避免的。Celery 提供了优雅的错误处理和重试机制:
@app.task(bind=True, max_retries=3)
def unreliable_task(self):try:# 可能失败的操作risky_operation()except Exception as exc:# 捕获异常并重试raise self.retry(exc=exc, countdown=60)
在这个例子中,unreliable_task 如果失败,会在 60 秒后重试,总共重试 3 次。
使用 Celery 在 Web 应用
Celery 常用于 Web 应用中来处理后台任务。下面是一个简单的 Flask 应用,展示了如何集成 Celery:
from flask import Flask, request, jsonify
from celery import Celeryapp = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'def make_celery(app):celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])celery.conf.update(app.config)class ContextTask(celery.Task):def __call__(self, *args, **kwargs):with app.app_context():return self.run(*args, **kwargs)celery.Task = ContextTaskreturn celerycelery = make_celery(app)@celery.task
def long_task():import timetime.sleep(10)return 'Task completed!'@app.route('/start-task', methods=['POST'])
def start_task():task = long_task.apply_async()return jsonify({}), 202, {'Location': url_for('get_status', task_id=task.id)}@app.route('/status/<task_id>')
def get_status(task_id):task = long_task.AsyncResult(task_id)response = {'state': task.state,'result': task.result,}return jsonify(response)if __name__ == '__main__':app.run(debug=True)
在这个示例中,我们定义了一个 Flask 应用,并将 Celery 集成到其中。long_task 是一个模拟长时间运行的任务,客户端可以通过 start-task 路由启动任务,并通过 status/<task_id> 路由查询任务状态。
更多功能、详细用法可参考官方文档:
https://docs.celeryq.dev/en/stable
…
四、总结
Celery 是一个强大的工具,它为我们处理异步任务、计划任务和分布式任务提供了极大的便利。在这篇文章中,我们通过多个代码示例,展示了 Celery 的基本用法、任务链和组、定时任务、错误处理和重试机制,以及如何在 Web 应用中集成 Celery。
如果你在日常工作中需要处理复杂的任务调度和分布式任务,那么 Celery 绝对是一个值得深入学习和使用的工具。希望这篇文章能够帮助你更好地理解和使用 Celery,让你的工作更加高效、顺畅。
试一试吧,Celery 会让你的任务调度变得简单又高效!
…
五、作者Info
Author:小鸿的摸鱼日常
Goal:让编程更有趣! 专注于 Web 开发、爬虫,游戏开发,数据分析、自然语言处理,AI 等,期待你的关注,让我们一起成长、一起Coding!
版权说明:本文禁止抄袭、转载,侵权必究!