在FastApi框架搭建的WBE系统中如何实现定时任务的管理?
Python中常见的定时任务框架包括Celery、APScheduler和Huey。以下是每个框架的简单对比和示例代码。
1.Celery: 分布式任务队列,适合处理长时间运行的任务。
# 安装celery
# pip install celery# celery_task.py
from celery import Celeryapp = Celery('tasks', broker='redis://localhost:6379/0')@app.task
def add(x, y):return x + y# 使用
# celery -A celery_task worker -l info
# 执行任务
# result = add.delay(4, 4)
2.APScheduler: 定时任务调度,内置多种触发器。
# 安装apscheduler
# pip install apschedulerfrom apscheduler.schedulers.blocking import BlockingSchedulerdef my_job():print("执行任务...")scheduler = BlockingScheduler()
scheduler.add_job(my_job, 'interval', seconds=5) # 每5秒执行一次
scheduler.start()
3.Huey: 轻量级的任务队列,使用Redis作为数据存储。
# 安装huey
# pip install hueyfrom huey import Hueyhuey = Huey('my_app')@huey.task()
def task_a():print('Task A is running')# 使用
if __name__ == '__main__':huey.run()# 或者在生产环境中使用huey.poll()
Celery适合处理长任务,需要消息队列和分布式的场景;Huey轻量但需要其他Redis做存储。
所以我们选择APScheduler集成到我们的web系统中。
环境配置
pip install apscheduler fastapi[all]
APScheduler的基本组件
APScheduler 有四种组件,分别是:调度器(scheduler),作业存储(job store),触发器(trigger),执行器(executor)。
这里有个注意事项,很多博主都没讲的地方,在Web项目中集成APScheduler,调度器不能选择BlockingScheduler,这样会阻塞WEB系统的进程,导致定时框架启动而,web系统无法运行。
不多说直接上干货:
定时框架基础配置
from pytz import timezone
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutorjobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}executors = {'default': ThreadPoolExecutor(30), # 线程池数量'processpool': ProcessPoolExecutor(3), # 进程池数量
}job_defaults = {'coalesce': False, # 默认情况下关闭新的作业'max_instances': 10 # 设置调度程序将同时运行的特定作业的最大实例数10
}
# scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults,
# timezone=timezone('Asia/Shanghai'))
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults,timezone=timezone('Asia/Shanghai'))def A():print('hello')
scheduler.add_job(A, 'interval', minutes=1)
管理接口
from fastapi.responses import JSONResponse@app.get('/list',summary='查询定时任务列表')
def task_list():'''查询定时任务列表'''data = [{'name': i.name, 'id': i.id,'start_date': i.trigger.start_date.strftime('%Y-%m-%d %H:%M:%S') if i.trigger.start_date else '','interval': str(i.trigger.interval),'interval_length': i.trigger.interval_length,'next_run_time': i.next_run_time.strftime('%Y-%m-%d %H:%M:%S') if i.next_run_time else '','end_date': i.trigger.end_date.strftime('%Y-%m-%d %H:%M:%S') if i.trigger.end_date else '','status': bool(i.next_run_time),}for i in scheduler.get_jobs()]return JSONResponse(data)@app.get('/info',summary='查询定时任务详情')
def task_info(id):'''查询定时任务详情'''data = {}if i := scheduler.get_job(id):data = {'name': i.name, 'id': i.id,'start_date': i.trigger.start_date.strftime('%Y-%m-%d %H:%M:%S') if i.trigger.start_date else '','interval': str(i.trigger.interval),'interval_length': i.trigger.interval_length,'next_run_time': i.next_run_time.strftime('%Y-%m-%d %H:%M:%S') if i.next_run_time else '','end_date': i.trigger.end_date.strftime('%Y-%m-%d %H:%M:%S') if i.trigger.end_date else '','status': bool(i.next_run_time),}return JSONResponse(data)@app.get('/stop',summary='停止指定任务')
def task_stop(id):'''停止指定任务'''if job := scheduler.get_job(id):job.pause()return JSONResponse('ok')@app.get('/resume',summary='恢复指定停止任务')
def task_resume(id):'''恢复指定停止任务'''if job := scheduler.get_job(id):job.resume()return JSONResponse('ok')@app.get('/stop/all',summary='停止所有任务')
def task_stop_all():'''停止所有任务'''for i in scheduler.get_jobs():i.pause()return JSONResponse('ok')@app.get('/resume/all',summary='恢复所有停止任务')
def task_resume_all():'''恢复所有停止任务'''for i in scheduler.get_jobs():i.resume()return JSONResponse('ok')@app.get('/remove/all',summary='删除所有任务')
def task_remove_all():'''删除所有任务'''scheduler.remove_all_jobs()return JSONResponse('ok')@app.get('/remove',summary='删除指定任务')
def task_remove(id):'''删除指定任务'''if job := scheduler.get_job(id):job.remove()return JSONResponse('ok')