在Celery在python中的应用除了实现异步任务(async task
)外也可以执行定时任务(beat
)
1.Celery定时任务是什么?
Celery默认任务单元由任务生产者触发,但有时可能需要其自动触发, 而beat
进程正是负责此类任务,能够自动触发定时/周期性任务.
只需要在配置中配置好周期任务,然后在运行一个周期任务触发器(beat)即可
2.直接上代码
目录结构如下:
celery_app.py 文件代码如下:
import os
import sys
import time
import celery
from pathlib import Path
from datetime import timedelta# 实例化celery对象
app = celery.Celery("celery_worker",backend="redis://:@127.0.0.1:6379/4",broker="redis://:@127.0.0.1:6379/5",include=["celery_worker.email.tasks"],
)# celery beat 定时任务
beat_schedule = {'periodic_task-every-minute': {# 'task': 'celery_worker.email.tasks.add','task': 'chain.send_chains','schedule': timedelta(seconds=10),'args': (11, 22)},
}# 配置文件
app.conf.update(task_serializer="json",result_serializer="json",accept_content=["json"],task_default_queue="normal",timezone="Asia/Shanghai",enable_utc=False,task_ignore_result=True,redis_max_connections=100,result_expires=3600,beat_schedule=beat_schedule
)"""
celery -A celery_worker.celery_app worker -l info
celery -A celery_worker.celery_app beat
"""
email.tasks.py 代码如下:
from loguru import logger
# 模块化之后
from celery_worker.celery_app import app@app.task(name='chain.send_chains')
def add(x, y):logger.info(f'number_add 进来了...x:{x}, y:{y}')return x + y
然后顺序启动 worker 和 beat 定时任务(记得两个都必须启动)
执行如下命令:
celery -A celery_worker.celery_app worker -l info (启动干活的人)
celery -A celery_worker.celery_app beat (启动定时任务 类似crontab)
效果如下:
其实简单的来说就是这点代码:
beat_schedule = {'periodic_task-every-minute': {# 'task': 'celery_worker.email.tasks.add','task': 'chain.send_chains','schedule': timedelta(seconds=10),'args': (11, 22)},
}
periodic_task-every-minute
这个就是定时任务的名字 ,随便起无所谓。
重点是这个 "task"
,经过实际测试,如果这个工作函数没有指定name
名字的话,默认就是 函数路径+函数名称
也就是 'celery_worker.email.tasks.add'
。
但是如果这函数添加name
属性值的话 直接用名字也是可以的,也就是'chain.send_chains'
。
好了 小伙伴们也自己实操下吧!