基于sanic的服务使用celery完成动态修改定时任务

首先声明一下
考虑到celery目前和asyncio的不兼容性,协程任务需要转换为非异步的普通方法才能被当做task加入定时,并且celery和asyncio使用可能会带来预想不到的问题,在celery官方第二次承诺的6.0版本融合asyncio之前,需要慎重考虑一下
如果你的项目是融合了asyncio的项目,而且并不需要像celery文档中描述的那么多的复杂的定时功能,一个轻量级的包APScheduler完全可以满足你的需求,而且兼容asyncio框架

功能实现介绍

这是一个基于Sanic服务和Celery定时任务操作的功能,实现的原理大致如下图
在这里插入图片描述

  • Server:是我们的sanic服务,负责接收和响应请求,接收任务请求之后会异步非阻塞地将预警的定时任务交给celery处理
  • Beat(Scheduler): 定期触发任务(提前设置好的周期性或定时任务),有可用worker时,任务将会被执行,这里我们的服务使用redis作为Beat Scheduler
  • Queue: 接收的任务的队列,使任务有序的进出,是celery本身实现
  • Worker: 执行任务
  • Result Store(Result backend ):
    存储任务的位置,有需要时可召回任务的结果,但是任务的结果会设置一个过期时间,这里我们的服务使用redis作为Result Store

运行和使用的示例

sanic-celery server示例的目录结构
在这里插入图片描述

主要关注的内容在celery_app, query和第一层的sanic_server.py和结构,settings.py保存的是项目的根目录

import os
import sysCELERY_BASE_DIR = os.path.dirname(os.path.abspath(__file__))sys.path.insert(0, CELERY_BASE_DIR)

celery

celery app启动:

  • 创建celery app,并将celery app启动的配置信息加入(配置信息在执行命令行启动celery之前加入都可以)
  • 配置文件的内容,可参考官方文档,这里给出了简单示例的配置内容和说明,注意4.x之后的celery配置变量要用小写的

在这里插入图片描述

# -*- coding:utf-8 -*-
from celery import Celeryfrom . import config
app = Celery("app_name")
app.config_from_object(config)config.pybroker_url = 'redis://localhost:6379/1'
result_backend = 'redis://localhost:6379/2'
redbeat_redis_url = 'redis://localhost:6379/3'
redbeat_key_prefix = 'roiq_redbeat_key_prefix:'
# 任务运行结果过期时间,默认一天,传入秒数或者timedelta对象,参考https://docs.celeryq.dev/en/stable/userguide/configuration.html#result-expires
result_expires = 600task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Asia/Shanghai'
enable_utc = True# (!)所有的tasks都要提前在这里imports
imports = ("query.tasks","send_email.tasks"
)

关于参数的更多详细说明,可参考官方文档

Beat Scheduler是针对周期性任务和延时任务需求的,非Django的celery默认不支持celery服务运行的时候修改任务状态的,针对我们的业务需求,我们需要在服务运行的时候增加、修改和查看任务,因此引入了支持redis作为beat scheduler的模块redbeat,redbeat的使用参考链接,只需要使用其中的创建、更新和删除等常用操作方法

参考redbeat入门链接安装好redbeat之后,以redbeat作为celery的beat启动celery,不配置redbeat_redis_url时默认broker也是beat

celery启动命令

在windows环境下,beat要和worker、broker分开启动

指定readbeat作为beat启动celery

在命令行执行:celery -A celery_app beat -S redbeat.RedBeatScheduler -l debug --max-interval=10

  • -A是celery app的位置,这里celery_app的__init__.py中包含celery app
  • beat指定需要启动beat(默认不启动)
  • -S指定beat的Scheduler对象
  • -l是loglevel,打印日志的信息等级,支持info, debug等关键字
  • –max-interval指定beat检查新修改的任务的间隔时间,默认5分钟,这里为了方便调试设置为10秒钟,比较实时地看到结果

启动worker

在命令行执行:celery -A worker -l debug -P gevent,为了支持windows上运行,需要先安装gevent(pip install gevent),在linux不需要-P选项

更多参数和详情可以用celery --help,celery worker --help, celery beat --help查看

启动celery服务之后,测试celery运行时的修改操作

redbeat在celery运行时修改任务的操作

使用redbeat支持在celery运行时修改任务的操作,执行时确保celery的app、worker、beat服务和redis等存储服务都在运行

一个模拟的定时任务:

query/tasks.py

# -*- coding:utf-8 -*-import asyncio
import timeimport pandas as pdfrom celery_app import appasync def countdown_task(a, b):"""以一个简单的方法代替sql查询的task"""await asyncio.sleep(1)for i in range(3):print(f"-------{i}---------")time.sleep(1)return a+b@app.task
def sync_countdown_task(a, b):return asyncio.get_running_loop().run_until_complete(countdown_task(a, b))

由于项目中使用的全都是异步协程方法,需要将协程转换为普通的任务,才能够注册为celery的task

sanic_server.py

# -*- coding:utf-8 -*-
import asyncio
from datetime import timedeltafrom celery.schedules import crontab, schedule
from redbeat import RedBeatSchedulerEntry
from sanic import Sanic
from sanic import responsefrom celery_app import app as celery_app
from celery_app.config import redbeat_key_prefix
from query.tasks import sync_countdown_tasksanic_app = Sanic("sanic_celery")loop = asyncio.get_event_loop()# 开始定时任务,需要在不重启celery服务的情况下将任务添加到beat
async def query_task_create(request):"""通过此api创建周期性的查询任务"""tasks = f"query.tasks"            # 任务所在的模块(具体到.py文件)sche = schedule(timedelta(seconds=5))task_name = sync_countdown_task.__name__task = f"{tasks}.{task_name}"entry = RedBeatSchedulerEntry(task_name, task, sche, args=(1, 2), app=celery_app)print(entry)key = entry.key       # key存到数据库...entry.save()        return response.text(f"schedule2 created..., task key is: {key}")async def schedule_disable(request):task_name = sync_countdown_task.__name__key = redbeat_key_prefix + task_name        # key 可以entry = RedBeatSchedulerEntry.from_key(key, celery_app)entry.enabled = Falseentry.save()print(entry)return response.text("schedule disabled..")async def schedule_enable(request):task_name = sync_countdown_task.__name__key = redbeat_key_prefix + task_nameentry = RedBeatSchedulerEntry.from_key(key, celery_app)entry.enabled = Trueentry.save()print(entry)return response.text("schedule enabled..")async def schedule_delete(request):task_name = sync_countdown_task.__name__     # 请求时获得(最开始也是用数据库存储和获取)task_key = f"{redbeat_key_prefix}{sync_countdown_task.__name__}"entry = RedBeatSchedulerEntry.from_key(task_key, app=celery_app)print(entry)entry.delete()print("删除后的entry: ", entry)return response.text(task_name+" deleted")async def schedule_update(request):task_name = sync_countdown_task.__name__     # 请求时获得(最开始也是用数据库存储和获取)task_key = f"{redbeat_key_prefix}{sync_countdown_task.__name__}"# 获取task keyentry = RedBeatSchedulerEntry.from_key(task_key, app=celery_app)        # (!)要考虑任务已经删除,key不存在的情况print(entry)# 修改scheduleentry.schedule = schedule(timedelta(seconds=3))# 修改参数entry.args = (3, 4)entry.save()print(entry)return response.text(task_name+" updated")async def schedule_info(request):task_key = f"{redbeat_key_prefix}{sync_countdown_task.__name__}"entry = RedBeatSchedulerEntry.from_key(task_key, app=celery_app)return response.text(f"{entry}")sanic_app.add_route(query_task_create, "/create2")
sanic_app.add_route(schedule_update, "/update")
sanic_app.add_route(schedule_delete, "/delete")
sanic_app.add_route(schedule_disable, "/disable")
sanic_app.add_route(schedule_enable, "/enable")
sanic_app.add_route(schedule_info, "/info")if __name__ == '__main__':sanic_app.run(port=4321)

注:更新和删除等操作的key/task_key的获取,在上线时需要从数据库中存储和获取

设置定时任务的运作流程

  • 设定celery配置,存放于config.py中(也可以用其他方式存储)
  • 创建app,导入配置的内容
  • 编写好task和server调用的api
  • celery -A celery_app beat -S redbeat.RedBeatScheduler -l debug --max-interval=10类似的命令运行beat,celery -A celery_app worker -l debug -P gevent -E类似的命令运行worker
  • 运行sanic服务
  • 根据api传入的参数使用redbeat.RedBeatSchedulerEntry创建定时任务,使用RedBeatSchedulerEntry.from_key()获取并修改定时任务
  • 根据api用户和产品返回已设定的定时任务列表供用户查看和操作

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/469398.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Pyscript,使用Python编写前端脚本

介绍 Anaconda的CEO Peter Wang在前两个月的时候发布了Pyscript,实现了在HTML支持Python的使用,整个引用过程甚至不需要安装任何环境,只需要使用link和script标签即可引用实现Python在HTML中运行的功能,在HTML中也可以运行和使用…

如何把应用程序app编译进android系统

转载:http://ywxiao66.blog.163.com/blog/static/175482055201152710441106/------------------------------------------------------------------把常用的应用程序编译到img文件中,就成了系统的一部分,用户不必自己安装,当然也卸…

【Pytorch神经网络实战案例】08 识别黑白图中的服装图案(Fashion-MNIST)

1 Fashion-MNIST简介 FashionMNIST 是一个替代 MNIST 手写数字集 的图像数据集。 它是由 Zalando(一家德国的时尚科技公司)旗下的研究部门提供。其涵盖了来自 10 种类别的共 7 万个不同商品的正面图片。 FashionMNIST 的大小、格式和训练集/测试集划分与…

PHP list的赋值

List右边的赋值对象是一个以数值为索引的数组,左边的变量的位置和赋值对象的键值一一对应,有些位置的变量可以省略不写。非末尾的被赋值变量省略时,分隔的逗号不能省略。左边变量被赋值的顺序是从右到左的。 1 list($a, ,$b,$c[],$c[]) [1,2…

Pyscript,创建一个能执行crud操作的网页应用

目录 实现一个添加邀请客人名单的功能 循序渐进,逐步实现: 输入客人名称,按下enter键添加客人名单点击客人名单在名单上添加或者取消添加删除线,表示已经检查客人到场或未到场 checkbox,点击客人名单或者点击checkb…

爬虫实战学习笔记_1 爬虫基础+HTTP原理

1 爬虫简介 网络爬虫(又被称作网络蜘蛛、网络机器人,在某些社区中也经常被称为网页追逐者)可以按照指定的规则(网络爬虫的算法)自动浏览或抓取网络中的信息。 1.1 Web网页存在方式 表层网页指的是不需要提交表单,使…

爬虫实战学习笔记_2 网络请求urllib模块+设置请求头+Cookie+模拟登陆

1 urllib模块 1.1 urllib模块简介 Python3中将urib与urllib2模块的功能组合,并且命名为urllib。Python3中的urllib模块中包含多个功能的子模块,具体内容如下。 urllib.request:用于实现基本HTTP请求的模块。urlb.error:异常处理…

java----IO和NIO的区别

概念:NIO即New IO,这个库是在JDK1.4中才引入的。NIO和IO有相同的作用和目的,但实现方式不同,NIO主要用到的是块,所以NIO的效率要比IO高很多。在Java API中提供了两套NIO,一套是针对标准输入输出NIO&#xf…

【Pytorch神经网络理论篇】 11 卷积网络模型+Sobel算子原理

同学你好!本文章于2021年末编写,已与实际存在较大的偏差! 故在2022年末对本系列进行填充与更新,欢迎大家订阅最新的专栏,获取基于Pytorch1.10版本的理论代码(2023版)实现, Pytorch深度学习理论篇(2023版)…

DataFrame高效处理行列数据/倒三角型数据/处理阶梯型数据/根据列的值确定行的值

锲子 在使用pandas处理数据时,遇到了一种要按照留存天数来处理的数据,当列所对应的日期超过了最晚的“今天”那么数据就要置为0,举个例子: 在这个DataFrame中,需要将超过了今天2022-10-30的数据置为“-”&#xff0c…

Sanic服务启动失败,报错Cannot finalize with no routes defined

Sanic服务启动失败,记录解决方法 问题描述 Sanic服务启动失败,同样的代码和python版本在之前的win10系统上运行的好好的,换了台win11的机器就跑不起来了,不知道是系统原因还是因为换了执行pycharm等其他原因 在尝试启动时总是会…

【Pytorch神经网络理论篇】 12 卷积神经网络实现+卷积计算的图解

同学你好!本文章于2021年末编写,获得广泛的好评! 故在2022年末对本系列进行填充与更新,欢迎大家订阅最新的专栏,获取基于Pytorch1.10版本的理论代码(2023版)实现, Pytorch深度学习理论篇(2023版)目录地址…

【Pytorch神经网络实战案例】09 使用卷积提取图片的轮廓信息(手动模拟Sobel算子)

1 载入图片并显示 import matplotlib.pyplot as plt import matplotlib.image as mpimg import torch import torchvision.transforms as transforms import os os.environ["KMP_DUPLICATE_LIB_OK"]"TRUE" ### 1 载入图片并显示 myimg mpimg.imread(img.…

【Pytorch神经网络理论篇】 13 深层卷积神经网络介绍+池化操作+深层卷积神经网络实战

同学你好!本文章于2021年末编写,获得广泛的好评! 故在2022年末对本系列进行填充与更新,欢迎大家订阅最新的专栏,获取基于Pytorch1.10版本的理论代码(2023版)实现, Pytorch深度学习理论篇(2023版)目录地址…

【Pytorch神经网络实战案例】10 搭建深度卷积神经网络

识别黑白图中的服装图案(Fashion-MNIST)https://blog.csdn.net/qq_39237205/article/details/123379997基于上述代码修改模型的组成 1 修改myConNet模型 1.1.1 修改阐述 将模型中的两个全连接层,变为全局平均池化层。 1.1.2 修改结果 ### 1.5 定义模型类 class m…

【Pytorch神经网络理论篇】 14 过拟合问题的优化技巧(一):基本概念+正则化+数据增大

同学你好!本文章于2021年末编写,获得广泛的好评! 故在2022年末对本系列进行填充与更新,欢迎大家订阅最新的专栏,获取基于Pytorch1.10版本的理论代码(2023版)实现, Pytorch深度学习理论篇(2023版)目录地址…

MTK 8127平台使用busybox

一、什么是BusyBox ? BusyBox 是标准 Linux 工具的一个单个可执行实现。BusyBox 包含了一些简单的工具,例如 cat 和 echo,还包含了一些更大、更复杂的工具,例如 grep、find、mount 以及 telnet。有些人将 BusyBox 称为 Linux 工具…

【Pytorch神经网络理论篇】 15 过拟合问题的优化技巧(二):Dropout()方法

同学你好!本文章于2021年末编写,获得广泛的好评! 故在2022年末对本系列进行填充与更新,欢迎大家订阅最新的专栏,获取基于Pytorch1.10版本的理论代码(2023版)实现, Pytorch深度学习理论篇(2023版)目录地址…

【Pytorch神经网络理论篇】 16 过拟合问题的优化技巧(三):批量归一化

同学你好!本文章于2021年末编写,获得广泛的好评! 故在2022年末对本系列进行填充与更新,欢迎大家订阅最新的专栏,获取基于Pytorch1.10版本的理论代码(2023版)实现, Pytorch深度学习理论篇(2023版)目录地址…