首先声明一下
考虑到celery目前和asyncio的不兼容性,协程任务需要转换为非异步的普通方法才能被当做task加入定时,并且celery和asyncio使用可能会带来预想不到的问题,在celery官方第二次承诺的6.0版本融合asyncio之前,需要慎重考虑一下
如果你的项目是融合了asyncio的项目,而且并不需要像celery文档中描述的那么多的复杂的定时功能,一个轻量级的包APScheduler完全可以满足你的需求,而且兼容asyncio框架
功能实现介绍
这是一个基于Sanic服务和Celery定时任务操作的功能,实现的原理大致如下图
- Server:是我们的sanic服务,负责接收和响应请求,接收任务请求之后会异步非阻塞地将预警的定时任务交给celery处理
- Beat(Scheduler): 定期触发任务(提前设置好的周期性或定时任务),有可用worker时,任务将会被执行,这里我们的服务使用redis作为Beat Scheduler
- Queue: 接收的任务的队列,使任务有序的进出,是celery本身实现
- Worker: 执行任务
- Result Store(Result backend ):
存储任务的位置,有需要时可召回任务的结果,但是任务的结果会设置一个过期时间,这里我们的服务使用redis作为Result Store
运行和使用的示例
sanic-celery server示例的目录结构
主要关注的内容在celery_app, query和第一层的sanic_server.py和结构,settings.py保存的是项目的根目录
import os
import sysCELERY_BASE_DIR = os.path.dirname(os.path.abspath(__file__))sys.path.insert(0, CELERY_BASE_DIR)
celery
celery app启动:
- 创建celery app,并将celery app启动的配置信息加入(配置信息在执行命令行启动celery之前加入都可以)
- 配置文件的内容,可参考官方文档,这里给出了简单示例的配置内容和说明,注意4.x之后的celery配置变量要用小写的
# -*- coding:utf-8 -*-
from celery import Celeryfrom . import config
app = Celery("app_name")
app.config_from_object(config)config.pybroker_url = 'redis://localhost:6379/1'
result_backend = 'redis://localhost:6379/2'
redbeat_redis_url = 'redis://localhost:6379/3'
redbeat_key_prefix = 'roiq_redbeat_key_prefix:'
# 任务运行结果过期时间,默认一天,传入秒数或者timedelta对象,参考https://docs.celeryq.dev/en/stable/userguide/configuration.html#result-expires
result_expires = 600task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Asia/Shanghai'
enable_utc = True# (!)所有的tasks都要提前在这里imports
imports = ("query.tasks","send_email.tasks"
)
关于参数的更多详细说明,可参考官方文档
Beat Scheduler是针对周期性任务和延时任务需求的,非Django的celery默认不支持celery服务运行的时候修改任务状态的,针对我们的业务需求,我们需要在服务运行的时候增加、修改和查看任务,因此引入了支持redis作为beat scheduler的模块redbeat,redbeat的使用参考链接,只需要使用其中的创建、更新和删除等常用操作方法
参考redbeat入门链接安装好redbeat之后,以redbeat作为celery的beat启动celery,不配置redbeat_redis_url时默认broker也是beat
celery启动命令
在windows环境下,beat要和worker、broker分开启动
指定readbeat作为beat启动celery
在命令行执行:celery -A celery_app beat -S redbeat.RedBeatScheduler -l debug --max-interval=10
- -A是celery app的位置,这里celery_app的__init__.py中包含celery app
- beat指定需要启动beat(默认不启动)
- -S指定beat的Scheduler对象
- -l是loglevel,打印日志的信息等级,支持info, debug等关键字
- –max-interval指定beat检查新修改的任务的间隔时间,默认5分钟,这里为了方便调试设置为10秒钟,比较实时地看到结果
启动worker
在命令行执行:celery -A worker -l debug -P gevent
,为了支持windows上运行,需要先安装gevent(pip install gevent),在linux不需要-P选项
更多参数和详情可以用celery --help,celery worker --help, celery beat --help查看
启动celery服务之后,测试celery运行时的修改操作
redbeat在celery运行时修改任务的操作
使用redbeat支持在celery运行时修改任务的操作,执行时确保celery的app、worker、beat服务和redis等存储服务都在运行
一个模拟的定时任务:
query/tasks.py
# -*- coding:utf-8 -*-import asyncio
import timeimport pandas as pdfrom celery_app import appasync def countdown_task(a, b):"""以一个简单的方法代替sql查询的task"""await asyncio.sleep(1)for i in range(3):print(f"-------{i}---------")time.sleep(1)return a+b@app.task
def sync_countdown_task(a, b):return asyncio.get_running_loop().run_until_complete(countdown_task(a, b))
由于项目中使用的全都是异步协程方法,需要将协程转换为普通的任务,才能够注册为celery的task
sanic_server.py
# -*- coding:utf-8 -*-
import asyncio
from datetime import timedeltafrom celery.schedules import crontab, schedule
from redbeat import RedBeatSchedulerEntry
from sanic import Sanic
from sanic import responsefrom celery_app import app as celery_app
from celery_app.config import redbeat_key_prefix
from query.tasks import sync_countdown_tasksanic_app = Sanic("sanic_celery")loop = asyncio.get_event_loop()# 开始定时任务,需要在不重启celery服务的情况下将任务添加到beat
async def query_task_create(request):"""通过此api创建周期性的查询任务"""tasks = f"query.tasks" # 任务所在的模块(具体到.py文件)sche = schedule(timedelta(seconds=5))task_name = sync_countdown_task.__name__task = f"{tasks}.{task_name}"entry = RedBeatSchedulerEntry(task_name, task, sche, args=(1, 2), app=celery_app)print(entry)key = entry.key # key存到数据库...entry.save() return response.text(f"schedule2 created..., task key is: {key}")async def schedule_disable(request):task_name = sync_countdown_task.__name__key = redbeat_key_prefix + task_name # key 可以entry = RedBeatSchedulerEntry.from_key(key, celery_app)entry.enabled = Falseentry.save()print(entry)return response.text("schedule disabled..")async def schedule_enable(request):task_name = sync_countdown_task.__name__key = redbeat_key_prefix + task_nameentry = RedBeatSchedulerEntry.from_key(key, celery_app)entry.enabled = Trueentry.save()print(entry)return response.text("schedule enabled..")async def schedule_delete(request):task_name = sync_countdown_task.__name__ # 请求时获得(最开始也是用数据库存储和获取)task_key = f"{redbeat_key_prefix}{sync_countdown_task.__name__}"entry = RedBeatSchedulerEntry.from_key(task_key, app=celery_app)print(entry)entry.delete()print("删除后的entry: ", entry)return response.text(task_name+" deleted")async def schedule_update(request):task_name = sync_countdown_task.__name__ # 请求时获得(最开始也是用数据库存储和获取)task_key = f"{redbeat_key_prefix}{sync_countdown_task.__name__}"# 获取task keyentry = RedBeatSchedulerEntry.from_key(task_key, app=celery_app) # (!)要考虑任务已经删除,key不存在的情况print(entry)# 修改scheduleentry.schedule = schedule(timedelta(seconds=3))# 修改参数entry.args = (3, 4)entry.save()print(entry)return response.text(task_name+" updated")async def schedule_info(request):task_key = f"{redbeat_key_prefix}{sync_countdown_task.__name__}"entry = RedBeatSchedulerEntry.from_key(task_key, app=celery_app)return response.text(f"{entry}")sanic_app.add_route(query_task_create, "/create2")
sanic_app.add_route(schedule_update, "/update")
sanic_app.add_route(schedule_delete, "/delete")
sanic_app.add_route(schedule_disable, "/disable")
sanic_app.add_route(schedule_enable, "/enable")
sanic_app.add_route(schedule_info, "/info")if __name__ == '__main__':sanic_app.run(port=4321)
注:更新和删除等操作的key/task_key的获取,在上线时需要从数据库中存储和获取
设置定时任务的运作流程
- 设定celery配置,存放于config.py中(也可以用其他方式存储)
- 创建app,导入配置的内容
- 编写好task和server调用的api
celery -A celery_app beat -S redbeat.RedBeatScheduler -l debug --max-interval=10
类似的命令运行beat,celery -A celery_app worker -l debug -P gevent -E
类似的命令运行worker- 运行sanic服务
- 根据api传入的参数使用redbeat.RedBeatSchedulerEntry创建定时任务,使用RedBeatSchedulerEntry.from_key()获取并修改定时任务
- 根据api用户和产品返回已设定的定时任务列表供用户查看和操作