基于sanic的服务使用celery完成动态修改定时任务

首先声明一下
考虑到celery目前和asyncio的不兼容性,协程任务需要转换为非异步的普通方法才能被当做task加入定时,并且celery和asyncio使用可能会带来预想不到的问题,在celery官方第二次承诺的6.0版本融合asyncio之前,需要慎重考虑一下
如果你的项目是融合了asyncio的项目,而且并不需要像celery文档中描述的那么多的复杂的定时功能,一个轻量级的包APScheduler完全可以满足你的需求,而且兼容asyncio框架

功能实现介绍

这是一个基于Sanic服务和Celery定时任务操作的功能,实现的原理大致如下图
在这里插入图片描述

  • Server:是我们的sanic服务,负责接收和响应请求,接收任务请求之后会异步非阻塞地将预警的定时任务交给celery处理
  • Beat(Scheduler): 定期触发任务(提前设置好的周期性或定时任务),有可用worker时,任务将会被执行,这里我们的服务使用redis作为Beat Scheduler
  • Queue: 接收的任务的队列,使任务有序的进出,是celery本身实现
  • Worker: 执行任务
  • Result Store(Result backend ):
    存储任务的位置,有需要时可召回任务的结果,但是任务的结果会设置一个过期时间,这里我们的服务使用redis作为Result Store

运行和使用的示例

sanic-celery server示例的目录结构
在这里插入图片描述

主要关注的内容在celery_app, query和第一层的sanic_server.py和结构,settings.py保存的是项目的根目录

import os
import sysCELERY_BASE_DIR = os.path.dirname(os.path.abspath(__file__))sys.path.insert(0, CELERY_BASE_DIR)

celery

celery app启动:

  • 创建celery app,并将celery app启动的配置信息加入(配置信息在执行命令行启动celery之前加入都可以)
  • 配置文件的内容,可参考官方文档,这里给出了简单示例的配置内容和说明,注意4.x之后的celery配置变量要用小写的

在这里插入图片描述

# -*- coding:utf-8 -*-
from celery import Celeryfrom . import config
app = Celery("app_name")
app.config_from_object(config)config.pybroker_url = 'redis://localhost:6379/1'
result_backend = 'redis://localhost:6379/2'
redbeat_redis_url = 'redis://localhost:6379/3'
redbeat_key_prefix = 'roiq_redbeat_key_prefix:'
# 任务运行结果过期时间,默认一天,传入秒数或者timedelta对象,参考https://docs.celeryq.dev/en/stable/userguide/configuration.html#result-expires
result_expires = 600task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Asia/Shanghai'
enable_utc = True# (!)所有的tasks都要提前在这里imports
imports = ("query.tasks","send_email.tasks"
)

关于参数的更多详细说明,可参考官方文档

Beat Scheduler是针对周期性任务和延时任务需求的,非Django的celery默认不支持celery服务运行的时候修改任务状态的,针对我们的业务需求,我们需要在服务运行的时候增加、修改和查看任务,因此引入了支持redis作为beat scheduler的模块redbeat,redbeat的使用参考链接,只需要使用其中的创建、更新和删除等常用操作方法

参考redbeat入门链接安装好redbeat之后,以redbeat作为celery的beat启动celery,不配置redbeat_redis_url时默认broker也是beat

celery启动命令

在windows环境下,beat要和worker、broker分开启动

指定readbeat作为beat启动celery

在命令行执行:celery -A celery_app beat -S redbeat.RedBeatScheduler -l debug --max-interval=10

  • -A是celery app的位置,这里celery_app的__init__.py中包含celery app
  • beat指定需要启动beat(默认不启动)
  • -S指定beat的Scheduler对象
  • -l是loglevel,打印日志的信息等级,支持info, debug等关键字
  • –max-interval指定beat检查新修改的任务的间隔时间,默认5分钟,这里为了方便调试设置为10秒钟,比较实时地看到结果

启动worker

在命令行执行:celery -A worker -l debug -P gevent,为了支持windows上运行,需要先安装gevent(pip install gevent),在linux不需要-P选项

更多参数和详情可以用celery --help,celery worker --help, celery beat --help查看

启动celery服务之后,测试celery运行时的修改操作

redbeat在celery运行时修改任务的操作

使用redbeat支持在celery运行时修改任务的操作,执行时确保celery的app、worker、beat服务和redis等存储服务都在运行

一个模拟的定时任务:

query/tasks.py

# -*- coding:utf-8 -*-import asyncio
import timeimport pandas as pdfrom celery_app import appasync def countdown_task(a, b):"""以一个简单的方法代替sql查询的task"""await asyncio.sleep(1)for i in range(3):print(f"-------{i}---------")time.sleep(1)return a+b@app.task
def sync_countdown_task(a, b):return asyncio.get_running_loop().run_until_complete(countdown_task(a, b))

由于项目中使用的全都是异步协程方法,需要将协程转换为普通的任务,才能够注册为celery的task

sanic_server.py

# -*- coding:utf-8 -*-
import asyncio
from datetime import timedeltafrom celery.schedules import crontab, schedule
from redbeat import RedBeatSchedulerEntry
from sanic import Sanic
from sanic import responsefrom celery_app import app as celery_app
from celery_app.config import redbeat_key_prefix
from query.tasks import sync_countdown_tasksanic_app = Sanic("sanic_celery")loop = asyncio.get_event_loop()# 开始定时任务,需要在不重启celery服务的情况下将任务添加到beat
async def query_task_create(request):"""通过此api创建周期性的查询任务"""tasks = f"query.tasks"            # 任务所在的模块(具体到.py文件)sche = schedule(timedelta(seconds=5))task_name = sync_countdown_task.__name__task = f"{tasks}.{task_name}"entry = RedBeatSchedulerEntry(task_name, task, sche, args=(1, 2), app=celery_app)print(entry)key = entry.key       # key存到数据库...entry.save()        return response.text(f"schedule2 created..., task key is: {key}")async def schedule_disable(request):task_name = sync_countdown_task.__name__key = redbeat_key_prefix + task_name        # key 可以entry = RedBeatSchedulerEntry.from_key(key, celery_app)entry.enabled = Falseentry.save()print(entry)return response.text("schedule disabled..")async def schedule_enable(request):task_name = sync_countdown_task.__name__key = redbeat_key_prefix + task_nameentry = RedBeatSchedulerEntry.from_key(key, celery_app)entry.enabled = Trueentry.save()print(entry)return response.text("schedule enabled..")async def schedule_delete(request):task_name = sync_countdown_task.__name__     # 请求时获得(最开始也是用数据库存储和获取)task_key = f"{redbeat_key_prefix}{sync_countdown_task.__name__}"entry = RedBeatSchedulerEntry.from_key(task_key, app=celery_app)print(entry)entry.delete()print("删除后的entry: ", entry)return response.text(task_name+" deleted")async def schedule_update(request):task_name = sync_countdown_task.__name__     # 请求时获得(最开始也是用数据库存储和获取)task_key = f"{redbeat_key_prefix}{sync_countdown_task.__name__}"# 获取task keyentry = RedBeatSchedulerEntry.from_key(task_key, app=celery_app)        # (!)要考虑任务已经删除,key不存在的情况print(entry)# 修改scheduleentry.schedule = schedule(timedelta(seconds=3))# 修改参数entry.args = (3, 4)entry.save()print(entry)return response.text(task_name+" updated")async def schedule_info(request):task_key = f"{redbeat_key_prefix}{sync_countdown_task.__name__}"entry = RedBeatSchedulerEntry.from_key(task_key, app=celery_app)return response.text(f"{entry}")sanic_app.add_route(query_task_create, "/create2")
sanic_app.add_route(schedule_update, "/update")
sanic_app.add_route(schedule_delete, "/delete")
sanic_app.add_route(schedule_disable, "/disable")
sanic_app.add_route(schedule_enable, "/enable")
sanic_app.add_route(schedule_info, "/info")if __name__ == '__main__':sanic_app.run(port=4321)

注:更新和删除等操作的key/task_key的获取,在上线时需要从数据库中存储和获取

设置定时任务的运作流程

  • 设定celery配置,存放于config.py中(也可以用其他方式存储)
  • 创建app,导入配置的内容
  • 编写好task和server调用的api
  • celery -A celery_app beat -S redbeat.RedBeatScheduler -l debug --max-interval=10类似的命令运行beat,celery -A celery_app worker -l debug -P gevent -E类似的命令运行worker
  • 运行sanic服务
  • 根据api传入的参数使用redbeat.RedBeatSchedulerEntry创建定时任务,使用RedBeatSchedulerEntry.from_key()获取并修改定时任务
  • 根据api用户和产品返回已设定的定时任务列表供用户查看和操作

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/469398.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

shell 中的ifeq

libs_for_gcc -lgnunormal_libs foo: $(objects)ifeq ($(CC),gcc)$(CC) -o foo $(objects) $(libs_for_gcc)else$(CC) -o foo $(objects) $(normal_libs)endif 可见,在上面示例的这个规则中,目标“foo”可以根据变量“$(CC)”值来选取不同的函数库来编…

第一篇unity

在网上找的学习资料,做了点简单的效果。 半成品 http://files.cnblogs.com/files/buzhidaojiaoshenme/unity.rar 第二个游戏,方向键和“W”,”S“键移动方块,碰撞到最右边的方块过关。 http://files.cnblogs.com/files/buzhidaoji…

报错:OMP: Error #15: Initializing libomp.dylib, but found libiomp5.dylib already initialized.

问题描述: OMP: Error #15: Initializing libiomp5.dylib, but found libiomp5.dylib already initialized. OMP: Hint This means that multiple copies of the OpenMP runtime have been linked into the program. That is dangerous, since it can degrade perf…

Pyscript,使用Python编写前端脚本

介绍 Anaconda的CEO Peter Wang在前两个月的时候发布了Pyscript,实现了在HTML支持Python的使用,整个引用过程甚至不需要安装任何环境,只需要使用link和script标签即可引用实现Python在HTML中运行的功能,在HTML中也可以运行和使用…

如何把应用程序app编译进android系统

转载:http://ywxiao66.blog.163.com/blog/static/175482055201152710441106/------------------------------------------------------------------把常用的应用程序编译到img文件中,就成了系统的一部分,用户不必自己安装,当然也卸…

【Pytorch神经网络实战案例】08 识别黑白图中的服装图案(Fashion-MNIST)

1 Fashion-MNIST简介 FashionMNIST 是一个替代 MNIST 手写数字集 的图像数据集。 它是由 Zalando(一家德国的时尚科技公司)旗下的研究部门提供。其涵盖了来自 10 种类别的共 7 万个不同商品的正面图片。 FashionMNIST 的大小、格式和训练集/测试集划分与…

PHP list的赋值

List右边的赋值对象是一个以数值为索引的数组,左边的变量的位置和赋值对象的键值一一对应,有些位置的变量可以省略不写。非末尾的被赋值变量省略时,分隔的逗号不能省略。左边变量被赋值的顺序是从右到左的。 1 list($a, ,$b,$c[],$c[]) [1,2…

Pyscript,创建一个能执行crud操作的网页应用

目录 实现一个添加邀请客人名单的功能 循序渐进,逐步实现: 输入客人名称,按下enter键添加客人名单点击客人名单在名单上添加或者取消添加删除线,表示已经检查客人到场或未到场 checkbox,点击客人名单或者点击checkb…

爬虫实战学习笔记_1 爬虫基础+HTTP原理

1 爬虫简介 网络爬虫(又被称作网络蜘蛛、网络机器人,在某些社区中也经常被称为网页追逐者)可以按照指定的规则(网络爬虫的算法)自动浏览或抓取网络中的信息。 1.1 Web网页存在方式 表层网页指的是不需要提交表单,使…

LeetCode | HouseCode 算法题

题目: You are a professional robber planning to rob houses along a street. Each house has a certain amount of money stashed, the only constraint stopping you from robbing each of them is that adjacent houses have security system connected and it…

爬虫实战学习笔记_2 网络请求urllib模块+设置请求头+Cookie+模拟登陆

1 urllib模块 1.1 urllib模块简介 Python3中将urib与urllib2模块的功能组合,并且命名为urllib。Python3中的urllib模块中包含多个功能的子模块,具体内容如下。 urllib.request:用于实现基本HTTP请求的模块。urlb.error:异常处理…

Python解决多个进程服务重复运行定时任务的问题

记录多实例服务定时任务出现运行多次的问题 问题:web项目运行多个实例时,定时任务会被执行多次的问题 举例来说 我使用库APScheduler排定了一个定时任务taskA在每天的晚上9点需要执行一次,我的web服务使用分布式运行了8个实例,于…

java----IO和NIO的区别

概念:NIO即New IO,这个库是在JDK1.4中才引入的。NIO和IO有相同的作用和目的,但实现方式不同,NIO主要用到的是块,所以NIO的效率要比IO高很多。在Java API中提供了两套NIO,一套是针对标准输入输出NIO&#xf…

【Pytorch神经网络理论篇】 11 卷积网络模型+Sobel算子原理

同学你好!本文章于2021年末编写,已与实际存在较大的偏差! 故在2022年末对本系列进行填充与更新,欢迎大家订阅最新的专栏,获取基于Pytorch1.10版本的理论代码(2023版)实现, Pytorch深度学习理论篇(2023版)…

ubuntu 14.04中文显示乱码问题

乱码显示如下&#xff1a; [<E9><97><AE><E9><A2><98><E6><8F><8F><E8><BF><B0>]:<E5><A2><9E><E5><8A><A0>tm100<E9><A1><B9><E7><…

DataFrame高效处理行列数据/倒三角型数据/处理阶梯型数据/根据列的值确定行的值

锲子 在使用pandas处理数据时&#xff0c;遇到了一种要按照留存天数来处理的数据&#xff0c;当列所对应的日期超过了最晚的“今天”那么数据就要置为0&#xff0c;举个例子&#xff1a; 在这个DataFrame中&#xff0c;需要将超过了今天2022-10-30的数据置为“-”&#xff0c…

转载 ---资深HR告诉你:我如何筛选简历与选择人员的

资深HR告诉你&#xff1a;我如何筛选简历与选择人员的 有个公司HR看简历 先直接丢掉一半 理由是不要运气不好的应聘者。 当然这可能只是某些HR面对太多的简历产生了偷懒的情绪&#xff0c;但是不论是Manager&#xff0c;亦或是Team Leader&#xff0c;都会遇到招聘的问题&#…

爬虫实战学习笔记_3 网络请求urllib模块:设置IP代理+处理请求异常+解析URL+解码+编码+组合URL+URL连接

1 设置IP代理 1.1 方法论述 使用urllib模块设置代理IP是比较简单的&#xff0c;首先需要创建ProxyHandler对象&#xff0c;其参数为字典类型的代理IP&#xff0c;键名为协议类型&#xff08;如HTTP或者HTTPS)&#xff0c;值为代理链接。然后利用ProxyHandler对象与buildopene…

vim 插件cscope 使用

&#xff11;&#xff0e;安装 sudo apt-get install cscope &#xff12;&#xff0e;初始化 cscope -Rbq 你想在哪个目录下面用这个功能&#xff0c;就在哪个目录下面运行这个命令 &#xff13;&#xff0e;vim kpd.c &#xff14;&#xff0e;输入 :cs add cscope.o…

Sanic服务启动失败,报错Cannot finalize with no routes defined

Sanic服务启动失败&#xff0c;记录解决方法 问题描述 Sanic服务启动失败&#xff0c;同样的代码和python版本在之前的win10系统上运行的好好的&#xff0c;换了台win11的机器就跑不起来了&#xff0c;不知道是系统原因还是因为换了执行pycharm等其他原因 在尝试启动时总是会…