Celery介绍
github地址:GitHub - celery/celery: Distributed Task Queue (development branch)
文档地址:Celery - Distributed Task Queue — Celery 5.3.6 documentation
1.1 Celery是什么
celery时一个灵活且可靠的处理大量消息的分布式系统,可以在多个节点之间处理某个任务
celery时一个专注于实时处理的任务队列,支持任务调度
celery是开源的,有很多的使用者
celery完全基于python语言编写的
celery本质上是一个【分布式的异步任务调度框架】,类似于Apache的airflow
celery只是用来调度任务的,但是它本身并不具备存储任务的功能,而调度任务的时候肯定是要把任务存起来,因此要使用celery的话,还需要搭配一些具有存储、访问功能的工具,比如:消息队列、Redis缓存、数据库等。官方推荐是消息队列RabbitMQ,我们使用Redis
同步调用函数 --》add--》执行5s钟--》数据返回了
异步调用函数--》add--》执行5s钟--》执行完的数据,找个地方存起来
调用方--》去存的地方看一下--》任务有没有执行完
1.2 应用场景
1)异步任务
视频转码、邮件发送、消息推送等一些耗时操作
2)定时任务
定时推送消息、定时爬取一些数据、定时统计一些数据
3)延时任务
提交任务后,等一段时间再执行任务
1.3 celery架构
celery架构,它采用典型的生产者-消费者模式,主要由以下部分组成
生产者生产---消费者进行消费
producer:它负责把任务提交到broker钟
celery Beat:会读取文件、周期性的向broker中提交任务
broker:消息中间件,放任务的地点,celery本身不提供,借助redis等消息队列
worker:工人、消费者,负责从消息中间件中取出任务--》执行
backend:worker执行完,会有结果,结果存储再backend,celery不提供,借助redis等消息队列。
Celery使用
2.1 安装
pip install celery
使用redis作为消息队列
pip install redis
如果是Windows系统还需要安装eventlet
pip install eventlet
2.2 使用
创建main.py文件
import timefrom celery import Celery# 任务消息队列
broker = "redis://127.0.0.1:6379/1"
# 结果消息队列
backend = "redis://127.0.0.1:6379/2"
# 实例化app对象
app = Celery("demo", broker=broker, backend=backend)# 编写任务
@app.task # 被装饰器装饰了,才是celery任务
def add(a, b):print("a+b的结果是", a + b)time.sleep(1) # 模拟耗时return a + b
创建add_task.py文件编写消费者代码
"""这个程序用来提交任务 producer"""
from main import add# # 同步任务
# res = add(1, 2)
# print(res)
# 异步任务
# 像消息队列中提交了一个任务,计算1+5的任务,但是没有执行 ceec680b-e0fb-4636-9244-1fa7ca0c570c
res = add.delay(1, 5) # 没有耗时,直接返回,但是没有返回值,而是返回一个uuid号
print(res)
# 启动worker 再终端使用命令启动,执行完成后会把结果存到redis的2库中
# win :celery -A main worker -l info -P eventlet
# mac/linux:celery -A main worker -l info
启动worker需要再终端下方进行启动
win :celery -A main worker -l info -P eventlet
mac/linux:celery -A main worker -l info
如果报错celery库找不到的问题,使用python -m celery -A main worker -l info -P eventlet进行启动
2.3 包结构
后续的项目越来越大,task任务越来越多,希望把任务拆分再多个py文件中
目录结构
celery.py
import timefrom celery import Celery# 任务消息队列
broker = "redis://127.0.0.1:6379/1"
backend = "redis://127.0.0.1:6379/2"
# 实例化app对象
app = Celery("demo", broker=broker, backend=backend,include=['celery_task.crawl_task', 'celery_task.user_task', 'celery_task.order_task'])# 任务分到不同的py文件中
user_task.py
import time
from .celery import app@app.task
def send_email(to):print("发送邮件")time.sleep(3)print(f"向{to}发送邮件成功")return f"向{to}发送邮件成功"
order_task.py
import time
from .celery import app@app.task
def pay_order():print("开始下单")time.sleep(5)print("下单完成")return "下单完成"
crawl_task.py
import time
from .celery import app@app.task
def crawl_baidu():print("开始爬虫百度")time.sleep(2)print("爬虫完毕百度")return "爬虫完毕百度"@app.task
def crawl_dewu():print("开始爬虫得物")time.sleep(2)print("爬虫完毕百度")return "爬虫完毕得物"
add_task.py
from celery_task.crawl_task import crawl_baidures = crawl_baidu.delay() # 没有参数,这里就不传
print(res) # 得到uuid
get_result.py(查询是否被执行)
from .celery_task.celery import app
from celery.result import AsyncResultid = "你的任务uuid"
if __name__ == '__main__':result = AsyncResult(id=id, app=app)if result.successful():result = result.get()print(result)elif result.failed():print("任务失败")elif result.status == "PENDING":print("任务等待被执行")elif result.status == "RETRY":print("任务异常后正在重试")elif result.status == "STARTED":print("任务已经开始被执行")
启动worker命令
celery -A celery_task(包名) worker -l info -P eventlet
异步任务-延时任务-定时任务
异步任务
上述介绍的均为异步任务
使用delay()
延时任务
from celery_task.user_task import send_email
from datetime import datetime, timedeltaeta = datetime.utcnow() + timedelta(seconds=5) # 默认时区为utc时区
res = send_email.apply_async(args=['邮箱'], eta=eta)
print(res)
apply_async(args=['参数'],eta=延时时间)
如果延迟任务提交了,但是worker没启动,等延迟的时间,worker再启动,任务会立马启动
定时任务
在celery.py中
import timefrom celery import Celery# 任务消息队列
broker = "redis://127.0.0.1:6379/1"
backend = "redis://127.0.0.1:6379/2"
# 实例化app对象
app = Celery("demo", broker=broker, backend=backend,include=['celery_task.crawl_task', 'celery_task.user_task', 'celery_task.order_task'])# 任务分到不同的py文件中# 加入定时任务
# 指定了时区,中国时区,以后延时任务
app.conf.timezone = "Asia/Shanghai"
app.conf.enable_utc = False
# 每隔5s爬取百度
from datetime import datetime, timedeltaapp.conf.beat_schedule = {'low-task': {'task': 'celery_task.crawl_task.crawl_baidu','schedule': timedelta(seconds=5), # 每5秒发送一次'args': () # 参数}
}
# 必须启动beat
启动beat命令
celery -A celery_task beat -l info