【fastapi】定时任务管理

在FastApi框架搭建的WBE系统中如何实现定时任务的管理?
Python中常见的定时任务框架包括Celery、APScheduler和Huey。以下是每个框架的简单对比和示例代码。
1.Celery: 分布式任务队列,适合处理长时间运行的任务。

# 安装celery
# pip install celery# celery_task.py
from celery import Celeryapp = Celery('tasks', broker='redis://localhost:6379/0')@app.task
def add(x, y):return x + y# 使用
# celery -A celery_task worker -l info
# 执行任务
# result = add.delay(4, 4)

2.APScheduler: 定时任务调度,内置多种触发器。

# 安装apscheduler
# pip install apschedulerfrom apscheduler.schedulers.blocking import BlockingSchedulerdef my_job():print("执行任务...")scheduler = BlockingScheduler()
scheduler.add_job(my_job, 'interval', seconds=5)  # 每5秒执行一次
scheduler.start()

3.Huey: 轻量级的任务队列,使用Redis作为数据存储。

# 安装huey
# pip install hueyfrom huey import Hueyhuey = Huey('my_app')@huey.task()
def task_a():print('Task A is running')# 使用
if __name__ == '__main__':huey.run()# 或者在生产环境中使用huey.poll()

Celery适合处理长任务,需要消息队列和分布式的场景;Huey轻量但需要其他Redis做存储。
所以我们选择APScheduler集成到我们的web系统中。

环境配置
pip install apscheduler fastapi[all]

APScheduler的基本组件
APScheduler 有四种组件,分别是:调度器(scheduler),作业存储(job store),触发器(trigger),执行器(executor)。

这里有个注意事项,很多博主都没讲的地方,在Web项目中集成APScheduler,调度器不能选择BlockingScheduler,这样会阻塞WEB系统的进程,导致定时框架启动而,web系统无法运行。

不多说直接上干货:

定时框架基础配置

from pytz import timezone
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutorjobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}executors = {'default': ThreadPoolExecutor(30),  # 线程池数量'processpool': ProcessPoolExecutor(3),  # 进程池数量
}job_defaults = {'coalesce': False,  # 默认情况下关闭新的作业'max_instances': 10  # 设置调度程序将同时运行的特定作业的最大实例数10
}
# scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults,
#                               timezone=timezone('Asia/Shanghai'))
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults,timezone=timezone('Asia/Shanghai'))def A():print('hello')
scheduler.add_job(A, 'interval', minutes=1)

管理接口

from fastapi.responses import JSONResponse@app.get('/list',summary='查询定时任务列表')
def task_list():'''查询定时任务列表'''data = [{'name': i.name, 'id': i.id,'start_date': i.trigger.start_date.strftime('%Y-%m-%d %H:%M:%S') if i.trigger.start_date else '','interval': str(i.trigger.interval),'interval_length': i.trigger.interval_length,'next_run_time': i.next_run_time.strftime('%Y-%m-%d %H:%M:%S') if i.next_run_time else '','end_date': i.trigger.end_date.strftime('%Y-%m-%d %H:%M:%S') if i.trigger.end_date else '','status': bool(i.next_run_time),}for i in scheduler.get_jobs()]return JSONResponse(data)@app.get('/info',summary='查询定时任务详情')
def task_info(id):'''查询定时任务详情'''data = {}if i := scheduler.get_job(id):data = {'name': i.name, 'id': i.id,'start_date': i.trigger.start_date.strftime('%Y-%m-%d %H:%M:%S') if i.trigger.start_date else '','interval': str(i.trigger.interval),'interval_length': i.trigger.interval_length,'next_run_time': i.next_run_time.strftime('%Y-%m-%d %H:%M:%S') if i.next_run_time else '','end_date': i.trigger.end_date.strftime('%Y-%m-%d %H:%M:%S') if i.trigger.end_date else '','status': bool(i.next_run_time),}return JSONResponse(data)@app.get('/stop',summary='停止指定任务')
def task_stop(id):'''停止指定任务'''if job := scheduler.get_job(id):job.pause()return JSONResponse('ok')@app.get('/resume',summary='恢复指定停止任务')
def task_resume(id):'''恢复指定停止任务'''if job := scheduler.get_job(id):job.resume()return JSONResponse('ok')@app.get('/stop/all',summary='停止所有任务')
def task_stop_all():'''停止所有任务'''for i in scheduler.get_jobs():i.pause()return JSONResponse('ok')@app.get('/resume/all',summary='恢复所有停止任务')
def task_resume_all():'''恢复所有停止任务'''for i in scheduler.get_jobs():i.resume()return JSONResponse('ok')@app.get('/remove/all',summary='删除所有任务')
def task_remove_all():'''删除所有任务'''scheduler.remove_all_jobs()return JSONResponse('ok')@app.get('/remove',summary='删除指定任务')
def task_remove(id):'''删除指定任务'''if job := scheduler.get_job(id):job.remove()return JSONResponse('ok')

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/31016.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Axure教程】移动端多选图片上传

在移动端应用中,提供多选图片上传功能对于用户体验和功能性具有重要意义,尤其是在像微信、微博等社交媒体平台上。 例如用户可以快速上传多张图片进行分享,发布相册或创建图文并茂的动态;卖家可以一次性上传多个产品图片&#xf…

劲爆!Kimi月之暗面可以接入微信,智能升级, 打造个性多Agent(二)

前言 在当今这个快速发展的AI时代,抖音推出了一个名为“扣子Coze”的工具,帮助用户快速、低门槛地搭建属于自己的AI机器人。本文将详细介绍如何使用扣子Coze配置自己的AI Agent,并展示其在多个平台上的应用。 如何使用多个Agent 搭建更加智…

chatgpt: linux 下用纯c 编写一按钮,当按钮按下在一新窗口显示hello world

用这个程序模板,就可以告别只能在黑框框的终端中编程了。 在 Linux 环境下使用纯 C 语言编写一个按钮,当按钮按下时,在一个新窗口显示 "Hello World"。我们可以使用 GTK 库来实现这个功能。GTK 是一个用于创建图形用户界面的跨平台…

鸿蒙开发组件:【FA模型的Context】

FA模型的Context FA模型下只有一个Context。Context中的所有功能都是通过方法来提供的,它提供了一些featureAbility中不存在的方法,相当于featureAbility的一个扩展和补全。 接口说明 FA模型下使用Context,需要通过featureAbility下的接口…

大模型日报|4 篇必读的大模型论文

大家好,今日必读的大模型论文来啦! 1.ChatGLM 技术报告:从 GLM-130B 到 GLM-4 AII Tools GLM 技术团队介绍了 ChatGLM,这是一个不断发展的大语言模型系列。本报告主要关注 GLM-4 语言系列,包括 GLM-4、GLM-4-Air 和 …

r2frida:基于Frida的远程进程安全检测和通信工具

关于r2frida r2frida是一款能够将Radare2和Frida的功能合二为一的强大工具,该工具本质上是一个Radare2的自包含插件,可以帮助广大研究人员利用Frida的功能实现对目标进程的远程安全检测和通信管理。 Radare2项目提供了针对逆向工程分析的完整工具链&…

好用的抖音短视频矩阵系统推荐:筷子剪辑,超级编导。抖去推

目前短视频矩阵行业如火如荼,为大家推荐几款比较好用的短视频矩阵系统。 第一款叫做筷子剪辑,由筷子科技开发,网页版应用工具,无需下载安装 主打视频剪辑,支持一键成片,视频发布等,&#xff0…

为什么要把ip和mac地址绑定

IP地址和MAC地址绑定是一种网络安全措施,主要用于以下几个方面: 1. **防止IP地址冲突**:在局域网中,如果两个设备被分配了相同的IP地址,将会导致IP地址冲突,影响网络的正常使用。通过将IP地址与MAC地址绑定…

RS-232协议详解:深入理解与实际应用

RS-232协议详解 RS-232协议,也称为推荐标准232,是一种用于串行通信的标准协议。它在计算机和外围设备之间的通信中广泛应用。本文将详细介绍RS-232协议的各个方面,包括其历史、工作原理、信号类型、连接方式、应用场景等。希望通过这篇文章&a…

Linux 远程使用 Nvidia 显卡加速桌面

(首发地址:学习日记 https://www.learndiary.com/2024/06/nvidia-remote-desktop/) 朋友们,大家好!我是来自淘宝网学习日记小店的 learndiary,专注于 Linux 服务领域。今天,我想和大家分享一些…

靠这套车载测试面试题系列成功哪些20k!

HFP测试内容与测试方法 2.3 接听来电:测试手机来电时,能否从车载蓝牙设备和手机侧正常接听】拒接、通话是否正常。 1、预置条件:待测手机与车载车载设备处于连接状态 2、测试步骤: 1)用辅助测试机拨打待测手机&…

24年计算机等级考试22个常见问题解答❗

24年9月计算机等级考试即将开始,整理了报名中容易遇到的22个问题,大家对照入座,避免遇到了不知道怎么办? 1、报名条件 2、报名入口 3、考生报名之后后悔了,不想考了,能否退费? 4、最多能够报多少…

计网课设-发送TCP数据包

一、效果展示 二、代码实现 import nmap import socket import tkinter as tk from tkinter import messagebox,Listbox from threading import Thread#获取自身IP,从而确定当前局域网范围 def get_ip_address():#创建了一个socket对象,socket.AF_INET表…

扩散模型详细推导过程——训练与采样

扩散模型的训练与采样算法 训练目标的推导 需要使得去噪过程所产生的 x ( i ) \boldsymbol{x}^{(i)} x(i)的总体出现概率最大,先不考虑第几个样本,省略上标,即最大化 p ( x ∣ θ 1 : T ) p(\boldsymbol{x}|\theta_{1:T}) p(x∣θ1:T​)&am…

【Java】计算程序耗时多少

使用hutool自带的工具类实现 import cn.hutool.core.date.StopWatch; Slf4j public class TestApp {Testpublic void test1() {StopWatch stopWatch new StopWatch();try {// 开始计时stopWatch.start("handleReq");// 执行要测量的代码块performTask();// 停止计时…

国际版多商户商城小程序源码(Android+IOS+H5)

一站式全球购物新体验 功能介绍 精准分类、我的团队、开通会员我的返利、我的订单、快速购买 邀请返利、购物车、我的提现 一、引言:为何选择国际版多商户商城小程序? 随着全球化的步伐不断加快,越来越多的人开始追求国际化的购物体验。国…

FreeBSD在zfs挂接第二块ssd 硬盘

为FreeBSD机器新增加了一块ssd硬盘:骑尘 256G 先格式化分区硬盘 进入bsdconfig 选Disk Management 选择ada1 ,也就是新增加的硬盘 选择auto 然后选择Entire Disk 提示信息 The existing partition scheme on this disk (MBR) │ …

密码学与信息安全面试题及参考答案(2万字长文)

目录 什么是密码学?它的主要目标是什么? 请解释明文、密文、加密和解密的概念。 密码系统的安全性通常基于哪三种假设? 什么是Kerckhoffs原则?它对现代密码学设计有何意义? 简述密码学中的“混淆”和“扩散”概念。 什么是AES(高级加密标准)?AES有几种常见的密钥…

代码随想录训练营Day 64|卡码网98. 所有可达路径(深搜)

1.所有可达路径 98. 所有可达路径 | 代码随想录 代码&#xff1a; &#xff08;深搜&#xff09;邻接矩阵表示 #include <iostream> #include <vector> using namespace std; vector<int> path; vector<vector<int>> result; void dfs(const ve…

图论算法学习

图论 dfs是可一个方向去搜&#xff0c;不到黄河不回头&#xff0c;直到遇到绝境了&#xff0c;搜不下去了&#xff0c;再换方向&#xff08;换方向的过程就涉及到了回溯&#xff09;。bfs是先把本节点所连接的所有节点遍历一遍&#xff0c;走到下一个节点的时候&#xff0c;再…