APScheduler定时器使用:django中使用apscheduler,使用mysql做存储后端

一、基本环境

python版本:3.8.5

APScheduler==3.10.4
Django==3.2.7
djangorestframework==3.15.1
SQLAlchemy==2.0.29
PyMySQL==1.1.0

二、django基本设置

2.1、新增一个app

该app用来写apscheduler相关的代码

python manage.py startapp gs_scheduler

2.2、修改配置文件settings.py

#使用pymysql做客户端
import pymysql
pymysql.install_as_MySQLdb()INSTALLED_APPS = ['rest_framework', #注册restful 应用'gs_scheduler', #注册新增的app
]#配置mysql
MYSQL_HOST = '127.0.0.1'
MYSQL_PORT = 3306
MYSQL_USER = 'root'
MYSQL_PASSWORD = 'admin-root'
MYSQL_NAME = 'study_websocket'DATABASES = {'default': {'ENGINE': 'django.db.backends.mysql','HOST': MYSQL_HOST,'PORT': MYSQL_PORT,'USER': MYSQL_USER,'PASSWORD': MYSQL_PASSWORD,'NAME': MYSQL_NAME,}
}

2.3、gs_scheduler创建urls.py

1、gs_scheduler/urls.py

from django.urls import path
from . import views
urlpatterns = []

2、根路由urls.py

from django.contrib import admin
from django.urls import path,includeurlpatterns = [path('admin/', admin.site.urls),path('api/scheduler/',include('gs_scheduler.urls')),
]

三、配置gs_scheduler应用

3.1、配置api接口

这些接口,用来展示定时任务的运行情况

1、urls.py

from django.urls import path
from . import views
urlpatterns = [path('run_next/', views.JobNextRunTimeAPIView.as_view()),#定时任务下次运行path('run_history/',views.JobRunTimeHistory.as_view()),#定时任务运行历史path('run_error/',views.JobRunErrorHistory.as_view()), #定时任务最近运行错误
]

2、models.py

from django.db import models# Create your models here.
import sqlite3
from gs_scheduler.management.commands.config import MYSQL_HOST,MYSQL_NAME,MYSQL_PORT,MYSQL_PASSWORD,MYSQL_USER,MYSQL_CHARSET
from datetime import datetime,timedelta
from django.conf import settingsimport pymysql
from concurrent.futures import ThreadPoolExecutor#时间戳转时间字符串
def timestamp_to_time_str(timestamp):# 使用 datetime 模块将时间戳转换为 datetime 对象dt = datetime.fromtimestamp(timestamp)# 将 datetime 对象格式化为时间字符串time_str = dt.strftime('%Y-%m-%d %H:%M:%S')return time_strclass MysqlDB:DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'# 创建数据库连接池def __init__(self):self.conn = pymysql.connect(host=MYSQL_HOST,port=MYSQL_PORT,user=MYSQL_USER,password=MYSQL_PASSWORD,db=MYSQL_NAME,charset=MYSQL_CHARSET,cursorclass=pymysql.cursors.DictCursor)# 执行数据库增删改查def _execute_sql(self,query):sql,args = querytry:with self.conn.cursor() as cursor:# 执行sql语句if args:if isinstance(args,(tuple,list)):# args = (value1,value2)cursor.execute(sql,args)else:# args = value1cursor.execute(sql,(args,))else:cursor.execute(sql)# 不同类型sql,设置不同返回值if sql.strip().lower()[:6] == 'select':#查询语句rows = cursor.fetchall() or []return rowselif sql.strip().lower()[:4] == 'show':#查看表是否存在result = cursor.fetchall()return resultelse:#增删改语句self.conn.commit()return Trueexcept Exception as e:print('sql执行失败',e)pass# 线程池执行sql语句def start_sql_with_pool(self, sql:str,args=None):with ThreadPoolExecutor() as executor:result = executor.submit(self._execute_sql, (sql,args)).result()return result#创建记录定时任务历史表(调度器使用)def create_apscheduler_history(self):#创建表语句:表名不能是占位符sql = """CREATE TABLE apscheduler_history (id INTEGER AUTO_INCREMENT PRIMARY KEY,job_id VARCHAR(128),run_time DATETIME,is_error TINYINT,error_msg VARCHAR(256) NULL)"""#判断表存在不,不存在再创建results = self.start_sql_with_pool('SHOW TABLES')is_exist = Falsefor dic in results:if dic['Tables_in_{}'.format(MYSQL_NAME,)] == 'apscheduler_history':is_exist = True#表不存在才创建if not is_exist:# print('表不存在,执行创建表')create = self.start_sql_with_pool(sql)# 创建索引self.start_sql_with_pool(db.conn.cursor().execute("CREATE INDEX run_time_index ON apscheduler_history (run_time)"))return True# 将任务运行的结果记录到数据库中(调度器使用)def insert_into_apscheduler_history(self, data_list: list):if len(data_list) == 4:  # 异常执行的任务sql = """INSERT INTO apscheduler_history (job_id,run_time,is_error,error_msg) VALUES (%s,%s,%s,%s)"""else:  # 正常执行的任务sql = """INSERT INTO apscheduler_history (job_id,run_time,is_error) VALUES (%s,%s,%s)"""#使用线程池执行插入语句self.start_sql_with_pool(sql,data_list)# 删除8小时前的历史记录(调度器使用)def delete_8hour_before_history(self):before_8hour = (datetime.now() - timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S')sql = '''DELETE FROM apscheduler_history WHERE run_time < %s'''self.start_sql_with_pool(sql,(before_8hour,))# 查询每个任务的下次运行时间(api展示)def fetch_all_next_run_time(self):sql = 'SELECT id,next_run_time FROM apscheduler_jobs'# 获取查询结果rows = self.start_sql_with_pool(sql)for row in rows:row['next_run_time'] = timestamp_to_time_str(row['next_run_time'])return rows# 查询所有任务最近10次运行记录(api展示)def fetch_all_run_history(self):''':param query:'SELECT id,next_run_time,job_state FROM apscheduler_jobs':return:'''#获取所有定时任务iddata_list = self.fetch_all_next_run_time()id_list = (dic.get('id') for dic in data_list)# 创建一个游标对象sql = """SELECT id,job_id,run_time FROM apscheduler_history WHERE is_error=0 AND job_id = %s ORDER BY run_time DESC LIMIT 10"""ret_list = []for id in id_list:# 执行查询rows = self.start_sql_with_pool(sql,(id,)) or []dic = {'id': id, 'last_run_time': [], 'msg': '最近10次运行时间'}for row in rows:run_time = row.get('run_time')run_time = run_time.strftime(self.DATETIME_FORMAT) if isinstance(run_time,datetime) else run_timedic['last_run_time'].append(run_time)ret_list.append(dic)return ret_list# 获取任务最近运行失败情况(api展示)def fetch_all_error_history(self):data_list = self.fetch_all_next_run_time()id_list = (dic.get('id') for dic in data_list)sql = """SELECT run_time,error_msg FROM apscheduler_history WHERE is_error=1 AND job_id = %s ORDER BY id DESC LIMIT 5"""ret_list = []for id in id_list:# 获取查询结果rows = self.start_sql_with_pool(sql, (id,)) or []dic = {'id': id, 'last_run_time': [], 'msg': '最近5次执行失败'}for row in rows:run_time = row.get('run_time')run_time = run_time.strftime(self.DATETIME_FORMAT) if isinstance(run_time,datetime) else run_timeerror = row.get('error_msg')dic['last_run_time'].append({'run_time': run_time, 'error': error})ret_list.append(dic)return ret_list# 关闭连接def close(self):self.conn.close()if __name__ == '__main__':db = MysqlDB()db.create_apscheduler_history()# for i in range(1,10):#     db.insert_into_apscheduler_history(['send_to_big_data','2024-05-01 13:{}:12'.format(str(i).zfill(2)),1,'执行失败了'])# db.delete_8hour_before_history()# ret = db.fetch_all_run_history()# print(ret,'history')# ret = db.fetch_all_next_run_time()# print(ret,'next')# ret = db.fetch_all_error_history()# print(ret,'error')db.close()

3、views.py

from django.shortcuts import render# Create your views here.
from rest_framework.views import APIView
from rest_framework.response import Response
from .models import MysqlDB#任务下次运行时间
class JobNextRunTimeAPIView(APIView):authentication_classes = []def get(self,request):db = MysqlDB()data = db.fetch_all_next_run_time()db.close()ret = {'code':200,'status':'success','data':data,}return Response(ret)#任务最近运行历史
class JobRunTimeHistory(APIView):authentication_classes = []def get(self,request):db = MysqlDB()data = db.fetch_all_run_history()db.close()ret = {'code':200,'status':'success','data':data}return Response(ret)#任务最近运行错误
class JobRunErrorHistory(APIView):authentication_classes = []def get(self,request):db = MysqlDB()data = db.fetch_all_error_history()db.close()ret = {'code': 200,'status': 'success','data': data}return Response(ret)

3.2、在gs_scheduler创建

1、config.py 代码

该文件存放的是启动APScheduler调度器的一些配置数据

import os
from apscheduler.jobstores.memory import MemoryJobStore #内存做后端存储
#from apscheduler.jobstores.redis import RedisJobStore #redis做后端存储
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore #mysql等做后端存储
# from django.conf import settings
from study_apscheduler import settings
#mysql://root:ldc-root@127.0.0.1:3306/jobs?charset=utf8
MYSQL_CONFIG = settings.DATABASES.get('default')
MYSQL_USER = MYSQL_CONFIG.get('USER')
MYSQL_PASSWORD = MYSQL_CONFIG.get('PASSWORD')
MYSQL_HOST = MYSQL_CONFIG.get('HOST')
MYSQL_PORT = MYSQL_CONFIG.get('PORT')
MYSQL_NAME = MYSQL_CONFIG.get('NAME')
MYSQL_CHARSET = 'utf8mb4'
URL = 'mysql://{}:{}@{}:{}/{}?charset={}'.format(MYSQL_USER,MYSQL_PASSWORD,MYSQL_HOST,MYSQL_PORT,MYSQL_NAME,MYSQL_CHARSET)
#时区
TIME_ZONE = 'Asia/Shanghai'
#job的默认配置
JOB_DEFAULTS =  {'coalesce': True, #系统挂掉,任务积攒多次为执行,True是合并成一次执行,False是执行所有的次数。 持久化存储才有效'max_instances': 3 # 同一个任务同一时间最多只能有3个实例在运行。}
#job的存储后端
JOB_STORE = {'default': SQLAlchemyJobStore(url=URL)
}#监听事件对应的情况
LISTENER={1:'调度程序启动',2:'调度程序关闭',4:'调度程序中任务处理暂停',64:'将任务存储添加到调度程序中',8192:'任务在执行期间引发异常',4096:'任务执行成功',
}

2、task.py

所有的定时任务都存放在这里

import os
from datetime import datetime, timedelta, date
from gs_scheduler.models import MysqlDB
from utils.log_util import info_log
from utils.send_monitor_data import SendData#推送到数据仓的告警信息:5分钟执行一次
send_to_big_data = SendData().send#清除定时任务历史运行记录
def delete_apscheduler_history():db = MysqlDB()db.delete_8hour_before_history()if __name__ == '__main__':print(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))

3、crontab.py

实例化调度器

# 导入所需的调度器类和触发器类
from apscheduler.schedulers.background import BackgroundScheduler #后台运行
from apscheduler.schedulers.blocking import BlockingScheduler  #主进程运行,需要单独运行
from apscheduler.triggers.interval import IntervalTrigger #时间间隔
from apscheduler.triggers.cron import CronTrigger #复杂的定时任务
from apscheduler.triggers.date import DateTrigger #一次性定时任务
from django.core.management.base import BaseCommand
from apscheduler import events
from pytz import timezone
from threading import RLock
from datetime import datetime, timedelta
from gs_scheduler.models import MysqlDB
#定时任务
from .task import delete_apscheduler_history
from .task import send_to_big_data
#日志
from utils.log_util import info_log
from .config import LISTENER
from .config import TIME_ZONE,JOB_DEFAULTS,JOB_STORE#脚本运行:python manage.py crontab
class Command(BaseCommand):TIME_FORMAT = '%Y-%m-%d %H:%M:%S'#初始化调度器def _scheduler_obj(self):scheduler = BlockingScheduler()scheduler.configure(timezone = TIME_ZONE, #时区job_defaults=JOB_DEFAULTS, #job的默认配置jobstores=JOB_STORE, #job的存储后端)return scheduler#添加任务def _add_job(self,scheduler:BlockingScheduler):#每5分钟执行一次推送告警到大数据仓scheduler.add_job(send_to_big_data,trigger=IntervalTrigger(minutes=1),id='send_to_big_data',replace_existing=True,coalesce=True,)#每隔8个小时,清除历史的记录scheduler.add_job(delete_apscheduler_history,trigger=IntervalTrigger(hours=8),id='delete_apscheduler_history',replace_existing=True,coalesce=True,)#添加监听器def _listener(self,event:events):code = event.coderun_time = datetime.now().strftime(self.TIME_FORMAT)msg = LISTENER.get(code)db = MysqlDB()if msg:if code == 4096:#成功运行job_id = event.job_id#记录到数据库中db.insert_into_apscheduler_history([job_id,run_time,0])elif code == 8192:#运行异常了job_id = event.job_id#记录到数据库中db.insert_into_apscheduler_history([job_id,run_time,1,msg])else:info_log(msg)db.close()def start(self):scheduler = self._scheduler_obj()# 创建记录运行记录db = MysqlDB()db.create_apscheduler_history()db.close()# 设置监听器scheduler.add_listener(self._listener)# 设置定时任务self._add_job(scheduler)try:# print('{},定时器启动成功,等待定时任务执行...'.format(datetime.now().strftime(self.TIME_FORMAT)))scheduler.start()except KeyboardInterrupt:scheduler.shutdown()# python manage.py crontab运行 就是调用该方法def handle(self, *args, **options):self.start()#伴随django,在后台运行的。 在wsgi.py 文件中,调用BackRunScheduler().start()
class BackRunScheduler(Command):# 初始化调度器def _scheduler_obj(self):scheduler = BackgroundScheduler()scheduler.configure(timezone=TIME_ZONE,  # 时区job_defaults=JOB_DEFAULTS,  # job的默认配置jobstores=JOB_STORE,  # job的存储后端)return scheduler

3.3、启动方式:

方式一: python manage.py  crontab    (生产环境,推荐使用此方式,单独运行)

方式二:在settings.py中,调用BackRunScheduler().start()  , 后台运行

四、测试

4.1、启动

启动django项目:python manage.py runserver 8080

启动定时器:python manage.py crontab

4.2、等待一段时间

1、查询任务下次执行时间

请求:http://127.0.0.1:8080/api/scheduler/run_next/

2、查询任务最近运行情况

请求:http://127.0.0.1:8080/api/scheduler/run_next/

3、查询任务最近异常情况

请求:http://127.0.0.1:8080/api/scheduler/run_error/

五、源代码下载

码云地址:

django应用定时器: django下使用定时器的方法icon-default.png?t=N7T8https://gitee.com/liuhaizhang/django-application-timer/

目前有两套代码:一个以mysql存储任务,一个用sqlite存储任务

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/831026.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ollamallama

Olllama 直接下载ollama程序&#xff0c;安装后可在cmd里直接运行大模型&#xff1b; llama 3 meta 开源的最新llama大模型&#xff1b; 下载运行 1 ollama ollama run llama3 2 github 下载仓库&#xff0c;需要linux环境&#xff0c;windows可使用wsl&#xff1b; 接…

C++浮点数format时的舍入问题

C浮点数format时的舍入问题 首先有这样一段代码&#xff1a; #include <iostream> #include <stdio.h> using namespace std;int main() {cout << " main begin : " << endl;printf("%.0f \r\n", 1.5);printf("%.0f \r\n&…

吴恩达2022机器学习专项课程(一)8.2 解决过拟合

目录 解决过拟合&#xff08;一&#xff09;&#xff1a;增加数据解决过拟合&#xff08;二&#xff09;&#xff1a;减少特征特征选择缺点 解决过拟合&#xff08;三&#xff09;&#xff1a;正则化总结 解决过拟合&#xff08;一&#xff09;&#xff1a;增加数据 收集更多训…

【c++】模板编程解密:C++中的特化、实例化和分离编译

&#x1f525;个人主页&#xff1a;Quitecoder &#x1f525;专栏&#xff1a;c笔记仓 朋友们大家好&#xff0c;本篇文章我们来学习模版的进阶部分 目录 1.非类型模版参数按需实例化 2.模版的特化函数模版特化函数模版的特化类模版全特化偏特化 3.分离编译模版分离编译 1.非类…

综合性练习(后端代码练习4)——图书管理系统

目录 一、准备工作 二、约定前后端交互接口 1、需求分析 2、接口定义 &#xff08;1&#xff09;登录接口 &#xff08;2&#xff09;图书列表接口 三、服务器代码 &#xff08;1&#xff09;创建一个UserController类&#xff0c;实现登录验证接口 &#xff…

网络应用层之(6)L2TP协议详解

网络应用层之(6)L2TP协议 Author: Once Day Date: 2024年5月1日 一位热衷于Linux学习和开发的菜鸟&#xff0c;试图谱写一场冒险之旅&#xff0c;也许终点只是一场白日梦… 漫漫长路&#xff0c;有人对你微笑过嘛… 全系列文档可参考专栏&#xff1a;通信网络技术_Once-Day的…

Apollo Dreamview+之播放离线数据包

前提条件 完成 Dreamview 插件安装&#xff0c;参见 Studio 插件安装 。 操作步骤 您可以通过包管理和源码两种方式快速体验离线数据包播放操作。其中进入 docker 环境和启动 dreamview 的命令有所区别&#xff0c;请您按照命令进行操作。 步骤一&#xff1a;启动并打开 Dr…

C++学习第十四课:运算符类型与运算符重载

C学习第十四课&#xff1a;运算符类型与运算符重载 在C中&#xff0c;运算符重载是一种使得自定义类型&#xff08;如类对象&#xff09;能够使用C内建运算符的能力。运算符重载允许程序员定义运算符对用户定义类型的特殊行为&#xff0c;这增加了程序的可读性和自然表达能力。…

PaLmTac嵌入软体手手掌的视触觉传感器

触觉是感知和操作之间的桥梁。触觉信息对于手部行为反馈和规划具有重要意义。软体手的柔性特性在人机交互、生物医学设备和假肢等方面具有潜在应用的优势。本文提出了一种名为 PaLmTac的嵌入软体手手掌的视触觉传感器&#xff08;vision-based tactile sensor, VBTS&#xff09…

LeetCode 198—— 打家劫舍

阅读目录 1. 题目2. 解题思路3. 代码实现 1. 题目 2. 解题思路 此题使用动态规划求解&#xff0c;假设 d p [ i ] [ 0 ] dp[i][0] dp[i][0] 代表不偷窃第 i i i 个房屋可以获得的最高金额&#xff0c;而 d p [ i ] [ 1 ] dp[i][1] dp[i][1] 代表偷窃第 i i i 个房屋可以获…

Bluetooth Profile 蓝牙协议栈总结

GAP-Generic Access Profile 控制设备广播和连接 GAP profile 的目的是描述&#xff1a; Profile rolesDiscoverability modes and proceduresConnection modes and proceduresSecurity modes and procedures 设备连接过程 LE中GAP有4种角色&#xff1a;BroadcasterObserv…

RTMP 直播推流 Demo(二)—— 音频推流与视频推流

音视频编解码系列目录&#xff1a; Android 音视频基础知识 Android 音视频播放器 Demo&#xff08;一&#xff09;—— 视频解码与渲染 Android 音视频播放器 Demo&#xff08;二&#xff09;—— 音频解码与音视频同步 RTMP 直播推流 Demo&#xff08;一&#xff09;—— 项目…

暴雨服务器引领信创算力新潮流

去年大模型的空前发展&#xff0c;人工智能也终于迎来了属于自己的“文艺复兴”&#xff0c;众多的模型相继发布&#xff0c;继而催生了整个行业对于智能算力需求的激增。 市场需求与技术驱动仿佛现实世界的左右脚&#xff0c;催动着世界文明的齿轮向前滚动。在全球经济角逐日…

力扣:61. 旋转链表(Java,双指针)

目录 题目描述&#xff1a;输入&#xff1a;输出&#xff1a;代码描述&#xff1a; 题目描述&#xff1a; 给你一个链表的头节点 head &#xff0c;旋转链表&#xff0c;将链表每个节点向右移动 k 个位置。 输入&#xff1a; head [1,2,3,4,5], k 2 输出&#xff1a; [4,5,1,…

leetcode-滑动窗口的最大值-95

题目要求 思路 1.这个题是可以暴力求解的&#xff0c;但是时间复杂度比较高&#xff0c;因此&#xff0c;这里说一个时间复杂度为O(n)的方法 2.因为这个代码是优化后的结果&#xff0c;第一次写如果直接写成这样着实不容易&#xff0c;因此&#xff0c;我直接讲每一行的含义。…

【网络基础】深入理解TCP协议:协议段、可靠性、各种机制

文章目录 1. TCP协议段格式1.1. 如何解包 / 向上交付1.1.1. 交付1.1.2. 解包 1.2. 如何理解可靠性1.2.1. 确认应答机制&#xff08;ACK&#xff09;1.2.2. 序号 与 确认序号 2. TCP做到全双工的原因2.1. 16位窗口大小2.2. 6个标记位 3. 如何理解连接3.1 连接管理机制3.1.1. 三次…

44. UE5 RPG 初始化敌人的属性

在正常的游戏中&#xff0c;我们应该考虑如何去初始化角色属性&#xff0c;并且要给角色分好类型。比如&#xff0c;在我们游戏中&#xff0c;我们如何去初始化小兵的属性&#xff0c;并且还要实现小兵随着等级的增长而增加属性。而且就是小兵也有类型的区分&#xff0c;比如我…

PhpAdmin-getshell

PhpAdmin-getshell 通过未授权成功写入&#xff0c;然后getshell 路径&#xff1a;C:\phpstudy_pro\Extensions\MySQL5.7.26\ 写入木马&#xff1a; into写入文件&#xff1a; 使用需看要secure_file_priv的值。 当value为“null”时&#xff0c;不允许读取任意文件 当value为…

Android 文件传输

经常写adb命令传文件&#xff0c;结果发现Android studio有自带的文件管理器&#xff0c;可以上传下载文件。

高扬程消防水泵在火灾中的关键作用/恒峰智慧科技

在火灾这一无情的灾难面前&#xff0c;每一秒都至关重要。而在这一分一秒的较量中&#xff0c;高扬程消防水泵无疑扮演着举足轻重的角色。它不仅是灭火战斗的得力助手&#xff0c;更是保障人民生命财产安全的守护神。 高扬程消防水泵&#xff0c;顾名思义&#xff0c;其扬程远超…