一.Celery介绍
1.Celery架构
Celery架构基于可插拔组件(pluggable components)和根据选择的消息传输(代理)(message transport(broker))协议实现的消息交换机制。
2.Celery模块
(1)任务模块 Task
包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列。
(2)消息中间件 Broker
Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。
(3)任务执行单元 Worker
Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。
(4)任务结果存储 Backend
Backend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用 RabbitMQ、Redis 和 MongoDB 等。
二.Celery例子 [5]
1.代码结构
Celery例子参考文献[5],项目结构如下所示:
celery_demo # 项目根目录├── celery_app # 存放 celery 相关文件│ ├── __init__.py│ ├── celeryconfig.py # 配置文件│ ├── task1.py # 任务文件 1│ └── task2.py # 任务文件 2└── client.py # 应用程
(1)__init__.py
# -*- coding: utf-8 -*-
from celery import Celeryapp = Celery('demo') # 创建 Celery 实例
app.config_from_object('celery_app.celeryconfig') # 通过 Celery 实例加载配置模块
(2)celeryconfig.py
# -*- coding: utf-8 -*-
from datetime import timedelta
from celery.schedules import crontab# Broker and Backend
BROKER_URL = 'redis://127.0.0.1:7379' # 使用redis默认数据库0
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:7379/0' # 明确指定redis数据库0# Timezone
CELERY_TIMEZONE = 'Asia/Shanghai' # 指定时区,不指定默认为 'UTC'
# CELERY_TIMEZONE='UTC'# import
CELERY_IMPORTS = ('celery_app.task1','celery_app.task2'
)# schedules
CELERYBEAT_SCHEDULE = {'add-every-30-seconds': {'task': 'celery_app.task1.add','schedule': timedelta(seconds=30), # 每 30 秒执行一次'args': (5, 8) # 任务函数参数},'multiply-at-some-time': {'task': 'celery_app.task2.multiply','schedule': crontab(hour='9', minute='50'), # 每天早上 9 点 50 分执行一次'args': (3, 7) # 任务函数参数}
}
(3)task1.py
import time
from celery_app import app@app.task
def add(x, y):time.sleep(2)return x + y
(4)task2.py
import time
from celery_app import app@app.task
def multiply(x, y):time.sleep(2)return x * y
(5)client.py
"""
Celery入门教程:https://blog.csdn.net/youzhouliu/article/details/124239709
"""
# -*- coding: utf-8 -*-
from celery_app import task1
from celery_app import task2result1 = task1.add.apply_async(args=[2, 8]) # 可用task1.add.delay(2, 8)
print(result1.get())
# task2.multiply.apply_async(args=[3, 7]) # 可用task2.multiply.delay(3, 7)
# print(result2.get())print('hello world')
2.执行异步任务报错
在执行异步任务时报错:ValueError: not enough values to unpack (expected 3, got 0)
。
网上查了下这个错误,有3种解决方法。
(1)方式1
celery -A your_app_name worker --pool=solo -l info
(2)方式2
设置环境变量set FORKED_BY_MULTIPROCESSING = 1
(3)方式3
pip install eventlet
celery -A your_app_name worker --pool=eventlet
3.并发引擎
在 Celery 中,--pool
参数用于指定执行任务的并发引擎。Celery 支持多种并发引擎,包括 multiprocessing(默认)、eventlet、gevent 和 solo。这些并发引擎的主要区别在于它们处理并发任务的方式:
(1)multiprocessing
这是 Celery 默认并发引擎。它使用 Python 的 multiprocessing 模块创建子进程来并发执行任务。每个任务在自己的子进程中运行,子进程之间的内存是隔离的。这种方式可以充分利用多核 CPU,但进程间通信的开销较大。
(2)eventlet
这是一种基于协程的并发引擎。它使用 eventlet 库创建轻量级的线程(即协程)来并发执行任务。所有的协程在同一个进程中运行,协程之间可以共享内存,但只能在 I/O 操作时进行切换。这种方式适合 I/O 密集型任务,但不能充分利用多核 CPU。
(3)gevent
这也是一种基于协程的并发引擎。它使用 gevent 库创建协程来并发执行任务。gevent 的工作方式和 eventlet 类似,但它使用的是 libev 事件循环,而 eventlet 使用的是 libevent 事件循环。
(4)solo
这是一种特殊的并发引擎,它在主进程中直接执行任务,不创建任何子进程或线程。这种模式的优点是简单,没有进程间通信的开销,但缺点是无法利用多核 CPU,因为所有任务都在一个进程中顺序执行。
在选择并发引擎时,需根据任务特性和系统环境来决定。如果任务是 CPU 密集型的,那么 multiprocessing 可能是最好的选择。如果任务是 I/O 密集型的,那么 eventlet 或 gevent 可能更适合。
4.异步任务和定时任务
(1)异步任务命令
celery -A celery_app worker --loglevel=info
(2)定时任务命令
celery -A celery_app beat
参考文献
[1] Celery - Distributed Task Queue:https://docs.celeryq.dev/en/stable/
[2] Celery GitHub:https://github.com/celery/celery
[3] Celery API Reference:https://docs.celeryq.dev/en/stable/reference/index.html
[4] ValueError: not enough values to unpack (expected 3, got 0):https://github.com/celery/celery/issues/4178
[5] Celery入门教程:https://blog.csdn.net/youzhouliu/article/details/124239709