django-celery-beat自动调度异步任务

        Celery是一个简单、灵活且可靠的分布式系统,专门用于处理大量消息的实时任务调度。它支持使用任务队列的方式在分布的机器、进程、线程上执行任务调度。Celery不仅支持异步任务(如发送邮件、文件上传、图像处理等耗时操作),还支持定时任务,即需要在特定时间执行的任务。Celery本身不提供消息服务,需要借助RabbitMQ、Redis等消息中间件,本案例使用的是Redis。

        Celery Beat则是Celery的一个组件,专门用于处理定时任务调度。它包含一个调度器,负责根据配置的时间表计划任务的执行。这些任务通常是Celery任务,即异步执行的函数或方法。Celery Beat将计划的任务发送到Celery任务队列,由Celery Worker处理并执行队列中的任务。此外,Celery Beat还支持任务的持久性,即使在系统重启后也能够保持已计划的周期性任务。

开发环境:Python3 + MySQL + Redis  + PyCharm专业版

一、创建Django项目

参考 Python框架Django入门教程-CSDN博客 前三步

二、安装celery、mysql、redis等依赖包

eventlet 是一个python协程模板,celery 4版本以上在windows环境进行测试需要安装此依赖

django_celery_results 是任务执行结果的依赖

mysqlclient
redis
celery
eventlet
django-celery-beat
django_celery_results

三、初始化Celery数据库

打开PyCharm的终端,执行以下命令

python manage.py makemigrations
python manage.py migrate

打开数据库查看,执行命令后自动创建了一些表

django_celery_beat_clockedschedule  # 以指定时间执行任务,例如:2024-05-22 09:22:10
django_celery_beat_crontabschedule  # 以crontab格式时间执行任务,某月某天星期几某时某分
django_celery_beat_intervalschedule  # 以间隔时间执行任务,例如:每5秒、每2小时
django_celery_beat_periodictask  # 存储要执行的任务。
django_celery_beat_periodictasks  # 索引和跟踪任务更改状态
django_celery_beat_solarschedule  # 以天文时间执行任务,例如:日出、日落
django_celery_results_chordcounter  # 存储Celery的chord任务的状态
django_celery_results_groupresult  # 存储Celery的group任务的结果
django_celery_results_taskresult  # 存储Celery任务的执行结果

四、配置Celery

修改(注意是修改,不是添加!!!)settings.py,找到 INSTALLED_APPS变量(约31行),将celery注册到django的应用管理中

在settings.py末尾添加celery配置:

CELERY_BROKER_URL = 'redis://localhost:6379/0'  # 使用Redis作为消息代理
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'  # 结果存储也使用Redis
# 配置 celery 定时任务使用的调度器,使用django_celery_beat插件用来动态配置任务
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
# 配置celery自动存储任务执行结果
CELERY_RESULT_BACKEND = 'django_celery_results.backends:DatabaseBackend'
CELERY_TIMEZONE = 'Asia/Shanghai'  # 设置时区
# 是否启用UTC
CELERY_ENABLE_UTC = False
# 是否开启时间感知
DJANGO_CELERY_BEAT_TZ_AWARE = False

在settings.py同级目录下创建celery.py,然后添加以下内容:

import osfrom celery import Celery
from django.conf import settings# djangoDemo是项目名,大家根据自己的情况进行替换!!!
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangoDemo.settings')
# djangoDemo是项目名,大家根据自己的情况进行替换!!!
app = Celery('djangoDemo')
# 从django的设置中读取配置信息
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动发现app下的任务
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)@app.task(bind=True)
def debug_task(self):print(f"Request: {self.request!r}")

五、创建应用模块,进行测试

打开PyCharm终端执行命令,创建应用模块,这里命名为celeryapp,将新的应用模块注册到django的应用管理中

python manage.py startapp celeryapp

 

在新的应用模块中创建tasks.py,添加异步函数,文件名必须是tasks.py,否则后面启动Celery的时候监听不到

@shared_task
def task_one():print("------------------------- 000 <<<")# 业务逻辑...print("------------------------- 111 <<<")return "222"    

在新的应用模块中修改views.py,添加一个测试接口。这里说一个坑:在settings.py中配置了TIME_ZONE = 'Asia/Shanghai' 和  USE_TZ = True 之后,通过datetime.now()获取的是亚洲上海时间,但是把这个时间存到数据库,就会自动减少8小时,变成了UTC时间,就很无语,于是我把USE_TZ的值改为False,发现存到数据库中的时间变成正常的亚洲上海时间。然而Celery定时执行任务的时区是UTC,经过多次测试,配置了CELERY_TIMEZONE等时区相关的配置,发现好像并没什么用,Celery定时执行任务的时区依然是UTC,无奈只能把任务执行的时间减少8小时

import json
from datetime import datetime, timedeltafrom django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django_celery_beat.models import ClockedSchedule, PeriodicTaskdef celery_test(request):# 任务名task_name = 'celery_test'# 任务执行的时间,设置为下一分钟的10秒,celery执行时间的时区是UTC,要保证数据库中的时间是UTC时间,这里获取的是亚洲上海的时间,所以减了8小时time = (datetime.now() - timedelta(hours=8) + timedelta(minutes=1)).strftime("%Y-%m-%d %H:%M:10")# 创建任务的执行时间clock = ClockedSchedule.objects.get_or_create(clocked_time=time)# 创建指定时间执行的celery任务PeriodicTask.objects.update_or_create(name=task_name,  # 任务名,尽量保证唯一性,若该任务已存在,则更新该任务task='celeryapp.tasks.task_one',  # 要执行的异步函数的全路径defaults={'clocked': clock[0],  # 使用clocked在指定时间执行该任务'one_off': True,  # 在任务执行完一次后关闭该任务'enabled': True,  # 开启任务'args': json.dumps([]),  # 参数列表,必须是json格式的数组})return JsonResponse({'message': '200'})

修改urls.py,在urlpatterns中添加路由

from celeryapp import viewsurlpatterns = [# ......path('celery/test/', views.celery_test),
]

六、启动项目进行测试

先启动Django项目,然后在分别两个终端中执行命令启动Celery和Celery Beat,命令中的djangoDemo是项目名,大家根据自己的情况进行替换

celery -A djangoDemo worker -P eventlet -l info  # 启动worker监听异步任务
celery -A djangoDemo beat -l info  # 启动beat任务调度器

这里的woker已经监听到我们创建的celeryapp.tasks.task_one异步函数了

这里说明beat启动成功了:

浏览器输入请求地址:http://127.0.0.1:8000/celery/test/ ,创建Celery任务

接口请求成功后查看数据库,django_celery_beat_periodictask表新增了一条异步任务,其中的clocked_id字段指向django_celery_beat_clockedschedule表,该表新增了一条任务的执行时间。celery.backend_cleanup是Celery自动创建的,不用管

在到达任务执行时间后观察woker和beat的终端日志

查看beat终端日志,红框第一行是我们发送请求成功创建异步任务之后,CeleryBeat已经检测到数据库中有任务发生变化(CeleryBeat每5秒检测一次,使用debug级日志可查看到);第二行CeleryBeat将一个名为celery_test的任务发送给worker,让woker执行celeryapp.tasks.task_one异步函数,消费该任务

在woker终端的日志中可以看到任务执行的结果:

七、 Celery Beat常用的三种时间控制器

clockedSchedule:指定某个时间执行任务,例如:2024-05-22 09:22:10,对应的表是django_celery_beat_clockedschedule,该表仅有id和clocked_time两个字段

crontabSchedule:指定crontab格式的时间执行任务,某月某天星期几某时某分,与linux的定时任务规则一致,可参考Linux定时任务-CSDN博客,对应的表是django_celery_beat_crontabschedule,该表有7个字段

  `id` `minute` 分钟`hour` 小时`day_of_week`  星期几`day_of_month`  每月的哪些天`month_of_year`  每年的哪些月份`timezone`  时区

intervalSchedule:间隔指定时间执行任务,对应的表是django_celery_beat_intervalschedule,该表有3个字段id、every(间隔时长)、period(时间单位,可选时、分、秒、微秒、天)

代码示例,clockedSchedule上面已经演示过了,这里只演示另外两种:

# crontabSchedule
def celery_test2(request):task_name = 'celery_test2'crontab = CrontabSchedule.objects.get_or_create(minute='*/1',  # 每1分钟hour='*',  # 每小时day_of_week='*',  # 一周中的哪几天,*表示每天day_of_month='*',  # 月份中的哪一天,*表示每一天month_of_year='*',  # 年中的哪一月,*表示每个月timezone='Asia/Shanghai')PeriodicTask.objects.update_or_create(name=task_name,task='celeryapp.tasks.task_one',  # Celery任务的全路径defaults={'crontab': crontab[0],  # 使用crontab格式时间执行该任务'one_off': False,  # 在任务执行完一次后关闭该任务'enabled': True,  # 开启任务'args': json.dumps([]),})return JsonResponse({'message': '200'})# intervalSchedule
def celery_test3(request):task_name = 'celery_test3'interval = IntervalSchedule.objects.get_or_create(every=1,  # 间隔时间period=IntervalSchedule.MINUTES,  # 周期单位,这里是分钟)PeriodicTask.objects.update_or_create(name=task_name,task='celeryapp.tasks.task_one',  # Celery任务的全路径defaults={'interval': interval[0],  # 使用interval间隔指定时间执行该任务'one_off': False,  # 在任务执行完一次后关闭该任务'enabled': True,  # 开启任务'args': json.dumps([]),})return JsonResponse({'message': '200'})

其实只要创建不同的时间控制器,然后在创建任务的时候作为参数放进去即可,注意一个任务只能使用一种时间控制器

参考文献:

https://blog.csdn.net/wuwei_201/article/details/129650089

https://blog.51cto.com/u_15703497/6252757

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/842617.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024.05.27学习记录

1、面经复习&#xff1a; 实际工作经验章节 2、代码随想录刷题&#xff1a;动态规划剩下部分和单调栈 3、rosebush 组件库完成Input 和 AutoComplete部分内容

2024甘肃省三支一扶报名流程详细图解

预计报名时间&#xff1a;2024年5月27日9:00至5月31日18&#xff1a;00 2024甘肃省三支一扶报名流程 登录甘肃人力人力资源考试中心&#xff0c;选择网上报名 进入账户登录&#xff0c;首次登录同学请先注册账号。 注册账号&#xff0c;认真填写&#xff0c;仔细核对信息。…

惠海 H6901B升压恒流3.7V 7.4V 12V 24V 30V 36V 48V 60V 80V 100V调光无频闪细腻顺滑

H6901B是一款升压型LED恒流驱动芯片&#xff0c;具有良好稳定性的特点。H6901B的主要特点包括宽输入电压范围&#xff08;2.7V-100V&#xff09;、高工作频率&#xff08;1MHz&#xff09;以及多种保护功能&#xff08;如芯片供电欠压保护、过温保护、软启动等&#xff09;。此…

美颜技术揭秘:美颜SDK与美颜接口的开发实践

一、美颜技术的基本原理 1.1面部检测与特征点识别 面部检测是美颜技术的第一步&#xff0c;通过计算机视觉算法检测图像中的人脸位置。常用的方法有Haar特征、卷积神经网络&#xff08;CNN&#xff09;等。 1.2图像增强与美化 -磨皮 -美白 -眼部增强 -脸型优化 1.3实时处…

【爬虫软件】2024最新短视频评论区抓取工具

一、背景说明 1.0 采集目标 采集DOU音评论数据对引流截流和获客有很多好处。首先&#xff0c;通过分析DOU音评论数据&#xff0c;我们可以更好地了解用户对于产品或内容的喜好和需求&#xff0c;从而调整营销策略&#xff0c;吸引更多用户关注和点击。其次&#xff0c;评论数据…

解密MySQL中的临时表:探究临时表的神奇用途

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 解密MySQL中的临时表&#xff1a;探究临时表的神奇用途 前言临时表的定义与分类创建与使用临时表临时表的操作与管理优化与性能提升注意事项与最佳实践 前言 在数据库管理中&#xff0c;临时表是一个…

PGP安装以及汉化

目录 1.安装 2.汉化 1.安装 (1&#xff09;进入setup目录&#xff0c;双击安装包开始安装 (2&#xff09;选择默认语言English (3&#xff09;接受安装协议 I accept the license agreement (4&#xff09;选择第二项 Do not display the Release Notes (5&#xff09;选择“…

第十四 Elasticsearch介绍和安装

docker-compose安装 kibana: image: docker.elastic.co/kibana/kibana:7.5.1 container_name: kibana ports: - "5601:5601" environment: ELASTICSEARCH_HOSTS: http://elasticsearch:9200 depends_on: - elasticsearch…

按尺寸筛选轮廓图中的轮廓

1.按短边筛选 原始轮廓图&#xff1a; import cv2 import numpy as np# 读取轮廓图 contour_image cv2.imread(..\\IMGS\\pp_edge.png, cv2.IMREAD_GRAYSCALE)# 使用cv2.findContours()函数获取所有轮廓 contours, _ cv2.findContours(contour_image, cv2.RETR_EXTERNAL, cv2…

学习Uni-app开发小程序Day17

今天开始&#xff0c;就把uni-app前期使用的全部学完了&#xff0c;现在就把以前学习的&#xff0c;做成一案例&#xff0c;中间有未讲的&#xff0c;在进行补充&#xff0c;这里是根据老师视频进行项目案例编写的。 先弄出效果图&#xff0c;然后在根据效果图进行代码的编辑 …

uni-app 微信 支付宝 小程序 使用 longpress 实现长按删除功能,非常简单 只需两步

1、先看效果 2、直接上代码 ui结构 <view class"bind" longpress"deleteImage" :data-index"index"><view class"bind_left">绑定设备</view><view class"bind_right"><view class"bind_t…

Raven2掠夺者2渡鸦2角色创建、游戏预下载、账号怎么注册教程

《渡鸦2》&#xff08;Raven 2&#xff09;是由韩国开发的一款大型多人在线角色扮演游戏&#xff08;MMORPG&#xff09;类型的手游&#xff0c;作为前作《Raven》的续集&#xff0c;继承并发展了其黑暗奇幻世界观&#xff0c;同时在游戏设计和内容上进行了大量创新。游戏预计于…

Spring:IoC容器(基于注解管理bean)

1. HelloWorld * 引入依赖* 开启组件扫描* 使用注解定义 Bean* 依赖注入 2.开启组件扫描 <?xml version"1.0" encoding"UTF-8"?> <beans xmlns"http://www.springframework.org/schema/beans"xmlns:xsi"http://www.w3.org/20…

阿里云OSS文件上传和下载完整指南

目录 前言 一、前期准备 二、文件上传 上传进度条 三、文件下载 1.流式下载 2.下载到本地 3.进度条 前言 文件上传是常见需求&#xff0c;一般我们不会把文件直接上传到应用服务器&#xff0c;因为单台服务器存储空间是有限的&#xff0c;不好扩展。阿里云对象存储&…

Diffusion Model 和 Stable Diffusion 详解

文章目录 Diffusion Model 基础生成模型DDPM概述向前扩散过程前向扩散的逐步过程前向扩散的整体过程 反向去噪过程网络结构训练和推理过程训练过程推理过程优化目标 详细数学推导数学基础向前扩散过程反向去噪过程 Stable Diffusion组成结构运行流程网络结构VAE 模型文本编码器…

MyBatis的基础操作

目录 一.什么是MyBatis? 二.使用MyBatis的准备工作 1.引入依赖: 2.配置数据库连接字符串(建立MaBatis和MySQL的连接) 3.在model包中建立数据库对应的实体类UserInfo 三.通过注解的方式实现MyBatis的开发 1.插入语句(Insert) 2.删除语句(Delete) 3.更新语句(Update) 4…

突破乙肝治疗瓶颈新希望!恒瑞医药小核酸疗法领跑进入II期临床试验

近日&#xff0c;恒瑞医药的针对慢性乙型肝炎的小核酸疗法要准备开启一项多中心、随机、开放、平行设计的 II 期研究,旨在评估 HRS-5635 注射液单独或与其他药物联合治疗慢性乙型肝炎患者的疗效和安全性二期临床实验。去年开启的1期&#xff0c;今年就要准备2期实验了。 咱们国…

Java核心: Stream流的实现原理

Java 8之后我们对Stream的使用都已经习以为常了&#xff0c;它帮助我们从怎么做的细节里脱身&#xff0c;只要告诉它做什么即可。这一篇文章我们主要讲Java Stream的实现原理&#xff0c;手写一个Stream框架&#xff0c;然后再来讲解Java Stream的核心类&#xff0c;做到知其然…

vue-3d-loader 加载多个模型

需求 1、在使用three.js进行开发的过程中&#xff0c;需要列表加载多个模型&#xff0c;并根据需要多模型进行加载。 2、当鼠标移动到图片上去的时候&#xff0c;开始加载模型&#xff0c; 模型进行加载和展示。 3、在制作3d沉浸式商城时&#xff0c;需要根据需求&#xff0…

字典推导式

自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 使用字典推导式可以快速生成一个字典&#xff0c;它的表现形式和列表推导式类似。例如&#xff0c;我们可以使用下面的代码生成一个包含4个随机数的字…