APScheduler:定时任务框架
安装
文档:
https://apscheduler.readthedocs.io/en/stable/userguide.html
安装
$ pip install apscheduler
>>> import apscheduler
>>> apscheduler.version
'3.6.3'
组件
APScheduler
由一下四部分组成
triggers
:触发器,指定定时任务执行的时机,每个任务都有自己的触发器.job stores
:存储器,持久存储,默认存储在内存中.executors
:执行器,在定时任务执行时,以进程或线程方式执行scheduler
:调度器,包含BackgroundScheduler
(后台运行)和BlockingScheduler
(阻塞运行).他会合理安排作业存储器,执行器,触发器进行工作.并进行添加和删除任务等.调度器通常是只有一个的,开发人员很少直接操作触发器,存储器,执行器等.因为这些都由调度器自动来实现了.
触发器(triggers
)
1.date
在特定时间执行
示例:
from datetime import date, datetime
from apscheduler.schedulers.blocking import BlockingScheduler
sched = BlockingScheduler()
def my_job(text):
print(text)
# run_date 接受 date, datetime 数据类型
sched.add_job(my_job, 'date', run_date=datetime(2020, 11, 27, 19, 33, 30), args=['text'])
sched.start()更多:
https://apscheduler.readthedocs.io/en/stable/modules/triggers/date.html
2.interval
间隔执行
在固定的时间间隔后触发事件.参数如下:
weeks
周,整形 days
一个月中的第几天,整形 hours
时,整形 minutes
分,整形 seconds
秒,整形 start_date
起始时间 end_date
结束时间 jitter
触发的时间误差
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
sched = BlockingScheduler()
def job_funciton():
print('hello world')
sched.add_job(job_funciton, trigger='interval',seconds=5)
# 指定小时
# sched.add_job(job_funciton, trigger='interval', hours=2)
# 指定开始,结束时间
# sched.add_job(job_funciton, trigger='interval', start_date='2020-11-27 20:30:00', end_date='2010-11-27 21:30:00)
sched.start()
3.crontab
在某个确切的时间周期性触发事件.
year
年 month
1-12
月day
1-31
日week
1-53
周day_of_week
一周中的第几天( 0/Monday
)hour
0-23
minute
0-59
second
0-59
start_date
datetime
数据类型,或者字符串类型end_date
结束时间 timezone
时区 jitter
触发的误差时间 也可以用表达式类型,可以用以下方式:
表达式 字段 描述 * 任何 在每个值都触发 */a 任何 每隔 a触发一次 a-b 任何 在 a-b区间内任何一个时间触发( a必须小于 b) a-b/c 任何 在 a-b区间内每隔 c触发一次 xth y day 第 x个星期 y触发 lastx day 最后一个星期 x触发 last day 一个月中的最后一天触发 x,y,z 任何 可以把上面的表达式进行组合
from apscheduler.schedulers.blocking import BlockingScheduler
def job_function():
print "Hello World"
sched = BlockingScheduler()
# Schedules job_function to be run on the third Friday
# of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
sched.start()
调度器(schedulers
)
BlockingScheduler
:适用于调度程序是进程中唯一运行的进程,调用start
函数会阻塞当前线程,不能立即返回。BackgroundScheduler
:适用于调度程序在应用程序的后台运行,调用start
后主线程不会阻塞。AsyncIOScheduler
:适用于使用了asyncio
模块的应用程序。GeventScheduler
:适用于使用gevent
模块的应用程序。TwistedScheduler
:适用于构建Twisted
的应用程序。QtScheduler
:适用于构建Qt
的应用程序。TornadoScheduler
:tornado
任务存储器(job stores
)
有2中方式,一种是加载在内存中(默认配置),一种是使用数据库.使用内存简单高效,但是程序出现问题,从新运行,会把以前的任务从新再执行一次.数据库则可以在中断的地方恢复正常使用.
MemoryJobStore
:使用内存MongoDBJobStore
:使用mongodb
RedisJobStore
:使用redis
SQLAlchemy
:使用SQLAlchemy
框架
1.RedisJobStore
RedisJobStore(db=0, jobs_key='apscheduler.jobs', run_times_key='apscheduler.run_times', pickle_protocol=pickle.HIGHEST_PROTOCOL, **connect_args)
有2种创建的方法:
add_jobstore
:需要指定redis
的相关参数.
from datetime import datetime, timedelta
import sys
import os
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.redis import RedisJobStore
def alarm(time):
print('Alarm! This alarm was scheduled at %s.' % time)
if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_jobstore('redis', jobs_key='example.jobs', run_times_key='example.run_times', host='192.168.0.101', port=6379, db=0)
if len(sys.argv) > 1 and sys.argv[1] == '--clear':
scheduler.remove_all_jobs()
alarm_time = datetime.now() + timedelta(seconds=300)
scheduler.add_job(alarm, 'date', run_date=alarm_time, args=[datetime.now()])
print('To clear the alarms, run this example with the --clear argument.')
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
RedisJobStore
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.redis import RedisJobStore
from datetime import datetime, timedelta
jobstore = {
'default' : RedisJobStore(db=0, jobs_key='myfunc', run_times_key='myfunc_time', host='192.168.0.101', port=6379)
}
def my_func(t):
print('hello world, %s' %t)
if __name__ == '__main__':
sched = BlockingScheduler(jobstores=jobstore)
alarm_time = datetime.now() + timedelta(seconds=300)
sched.add_job(my_func,run_date=alarm_time, args=['ning'])
sched.start()均可在
redis
中查询到数据.
2.SQLAlchemy
使用
ORM
框架,演示使用MySql
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime, timedelta
def my_func(t):
print('hello %s' %t)
if __name__ == "__main__":
sched = BlockingScheduler()
url = 'mysql+pymysql://root:2008.Cn123@192.168.0.101:3306/test'
sched.add_jobstore('sqlalchemy', url=url,tablename='api_job')
alarm_time = datetime.now() + timedelta(seconds=300)
sched.add_job(my_func,run_date=alarm_time, args=['ning'])
sched.start()如果表不存在,会自动创建表.
tablename
用于指定表的名称.在数据库中可以查看到表
select * from api_job;
执行器executors
执行器取决于应用场景,默认是
ThreadPoolExecutor
,它可以满足大部分需求.如果是CPU
密集型计算,可以选择ProcessPoolExecutor
class apscheduler.executors.pool.ThreadPoolExecutor(max_workers=10)class apscheduler.executors.pool.ProcessPoolExecutor(max_workers=10)
# max_worker 指定最多使用线程/进程
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor
executors = {
'default': ThreadPoolExecutor(20),
}
conf = { # redis配置
"host":127.0.0.1,
"port":6379,
"db":15, # 连接15号数据库
"max_connections":10 # redis最大支持300个连接数
}
scheduler = BackgroundScheduler(executors=executors)
scheduler.add_jobstore(jobstore='redis', **conf) # 添加任务持久化存储方式,如果未安装redis可省略此步骤
任务操作
add_job(func, id='xxx', args=None, kwargs=None)
添加任务
# 添加任务func, func参数可以使用 '可导入模块:可调用对象'的方式引入,即可用模块来引入
#❯ tree -L 2
#├── func
#│ ├── add_func.py
#│ ├── __init__.py
#└── jobs.py
# add_func.py
def add(x,y):
print(x+y)
# jobs.py
from apscheduler.schedulers.blocking import BlockingScheduler
from func.add_func import add
shced = BlockingScheduler()
if __name__ == "__main__":
shced.add_job('func.add_func:add', args=[1,2], id='job1')
shced.start()除去使用
add_job()
,还可以使用装饰器函数scheduled_job
来添加任务.
remove_job(job_id)
:删除任务,需要指定job_id
pause_job(job_id)
:暂停任务resume_job(job_id)
:恢复任务modify_job(job_id, **changes)
:修改任务属性print_jobs()
:作业信息
# 方法1
job = scheduler.add_job(myfunc, 'interval', minutes=2) # 添加任务
job.remove() # 删除任务
job.pause() # 暂定任务
job.resume() # 恢复任务
# 方法2
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') # 添加任务
scheduler.remove_job('my_job_id') # 删除任务
scheduler.pause_job('my_job_id') # 暂定任务
scheduler.resume_job('my_job_id') # 恢复任务
示例
方法1
from pytz import utc
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
def tick():
print('Tick! The time is: %s' % datetime.now())
# 选择MongoDB作为任务存储数据库
jobstores = {
'mongo': MongoDBJobStore(),
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
# 默认使用线程池
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
# 默认参数配置
job_defaults = {
'coalesce': False, # 积攒的任务是否只跑一次,是否合并所有错过的Job
'max_instances': 3, # 默认同一时刻只能有一个实例运行,通过max_instances=3修改为3个。
'misfire_grace_time': 30 # 30秒的任务超时容错
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
scheduler.add_job(tick, 'interval', seconds=3)
scheduler.start()方法2
from apscheduler.schedulers.background import BackgroundScheduler
# The "apscheduler." prefix is hard coded
scheduler = BackgroundScheduler({
'apscheduler.jobstores.mongo': {
'type': 'mongodb'
},
'apscheduler.jobstores.default': {
'type': 'sqlalchemy',
'url': 'sqlite:///jobs.sqlite'
},
'apscheduler.executors.default': {
'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
'max_workers': '20'
},
'apscheduler.executors.processpool': {
'type': 'processpool',
'max_workers': '5'
},
'apscheduler.job_defaults.coalesce': 'false',
'apscheduler.job_defaults.max_instances': '3',
'apscheduler.timezone': 'UTC',
})方法3
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
jobstores = {
'mongo': {'type': 'mongodb'},
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': {'type': 'threadpool', 'max_workers': 20},
'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BackgroundScheduler()
# ..这里可以添加任务
scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
misfire_grace_time
:如果一个job本来14:00有一次执行,但是由于某种原因没有被调度上,现在14:01了,这个14:00的运行实例被提交时,会检查它预订运行的时间和当下时间的差值(这里是1分钟),大于我们设置的30秒限制,那么这个运行实例不会被执行。合并:最常见的情形是scheduler被shutdown后重启,某个任务会积攒了好几次没执行如5次,下次这个job被submit给executor时,执行5次。将coalesce=True后,只会执行一次
replace_existing
: 如果在程序初始化时,是从数据库读取任务的,那么必须为每个任务定义一个明确的ID,并且使用replace_existing=True,否则每次重启程序,你都会得到一份新的任务拷贝,也就意味着任务的状态不会保存。