github 地址:https://github.com/agronholm/apscheduler
apscheduler 基本概念介绍
说到定时任务,会想起 linux 自带的 crontab ,windows 自带的任务计划,都可以实现守时任务。操作系统基本都会提供定时任务的实现,但是如果想要更加精细化的控制,或者说任务程序需要跨平台运行,最好还是自己实现定时任务框架,Python中定时任务的解决方案,总体来说有四种,分别是: crontab、 scheduler、 Celery、 APScheduler,其中 crontab 不适合多台服务器的配置、 scheduler 太过于简单、 Celery 依赖的软件比较多,比较耗资源。最好的解决方案就是 APScheduler。Python 的 apscheduler 提供了非常丰富而且方便易用的定时任务接口。
apscheduler ( advance python scheduler ) 是 Python 中的任务调度库,使用起来十分方便 。提供了基于 日期、固定时间间隔、crontab 类型的任务,可以在主程序的运行过程中快速增加新作业或删除旧作业,如果把作业存储在数据库中,那么作业的状态会被保存,当调度器重启时,不必重新添加作业,作业会恢复原状态继续执行。apscheduler 可以当作一个跨平台的调度工具来使用,可以做为 linux 系统 crontab 工具或 windows 计划任务程序的替换。注意,apscheduler 不是一个守护进程或服务,它自身不带有任何命令行工具。它主要是要在现有的应用程序中运行,也就是说,apscheduler 为我们提供了构建专用调度器或调度服务的基础模块。
安装:pip install apschedule
调度器的工作流程
APScheduler 组件介绍
APScheduler 由5个部分组成:触发器、调度器、任务存储器、执行器、任务事件。
- 任务 job:任务id 和 任务执行 func
- 触发器(triggers):确定任务何时开始执行。触发器包含调度逻辑,描述一个任务何时被触发,按日期或按时间间隔或按 cronjob 表达式三种方式触发。每个作业都有它自己的触发器,除了初始配置之外,触发器是完全无状态的。
- 作业存储器(job stores): 也叫 "任务存储器",保存任务的状态。作业存储器指定了作业被存放的位置,默认情况下作业保存在内存,也可将作业保存在各种数据库中,当作业被存放在数据库中时,它会被序列化,当被重新加载时会反序列化。作业存储器充当保存、加载、更新和查找作业的中间商。在调度器之间不能共享作业存储。
- 执行器(executors):确定任务怎么执行。执行器是将指定的作业(调用函数)提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件。
- 调度器(schedulers):任务调度器,属于控制角色,通过它配置作业存储器、执行器和触发器,添加、修改和删除任务。调度器协调触发器、作业存储器、执行器的运行,通常只有一个调度程序运行在应用程序中,开发人员通常不需要直接处理作业存储器、执行器或触发器,配置作业存储器和执行器是通过调度器来完成的,调度器会自动完成。调度器串联任务的整个生命周期,添加编辑任务到任务存储器,在任务的执行时间到来时,把任务交给执行器执行返回结果;同时发出事件监听,监控任务事件 。
- 任务事件 event:监控任务执行异常情况
触发器
触发器决定何时执行任务,APScheduler 支持的触发器有3种
- trigger='interval':按固定时间周期执行,支持weeks,days,hours,minutes, seconds, 还可指定时间范围:sched.add_job(job_function, 'interval', hours=2, start_date='2010-10-10 09:30:00', end_date='2014-06-15 11:00:00')
- trigger='date':固定时间,执行一次。sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])
- trigger='cron': 支持 crontab 方式执行任务。也可指定时间范围。
year (int|str) – 4-digit year
month (int|str) – month (1-12)
day (int|str) – day of the (1-31)
week (int|str) – ISO week (1-53)
day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
hour (int|str) – hour (0-23)
minute (int|str) – minute (0-59)
second (int|str) – second (0-59)
start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)
end_date (datetime|str) – latest possible date/time to trigger on (inclusive)
# 星期一到星期五,5点30执行任务 job_function,直到2014-05-30 00:00:00
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')
# 按照crontab格式执行, 格式为:分钟 小时 天 月 周,*表示所有
# 5月到8月的1号到15号,0点0分执行任务job_function
sched.add_job(job_function, CronTrigger.from_crontab('0 0 1-15 may-aug *'))
触发器 的 参数
date 定时,作业只执行一次。
- run_date (datetime|str) – the date/time to run the job at
- timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already
sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])
# 2019年7月6号16时30分5s执行
sched.add_job(my_job, 'date', run_date=datetime(2019, 7, 6, 16, 30, 5), args=['text'])
from datetime import datetime
from datetime import date
from apscheduler.schedulers.blocking import BlockingSchedulerdef job(text):print(text)scheduler = BlockingScheduler()
# 在 2019-8-30 运行一次 job 方法
scheduler.add_job(job, 'date', run_date=date(2019, 8, 30), args=['text1'])
# 在 2019-8-30 01:00:00 运行一次 job 方法
scheduler.add_job(job, 'date', run_date=datetime(2019, 8, 30, 1, 0, 0), args=['text2'])
# 在 2019-8-30 01:00:01 运行一次 job 方法
scheduler.add_job(job, 'date', run_date='2019-8-30 01:00:00', args=['text3'])scheduler.start()
interval 间隔调度
- weeks (int) – number of weeks to wait
- days (int) – number of days to wait
- hours (int) – number of hours to wait
- minutes (int) – number of minutes to wait
- seconds (int) – number of seconds to wait
- start_date (datetime|str) – starting point for the interval calculation
- end_date (datetime|str) – latest possible date/time to trigger on
- timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations
scheduler.add_job(job_function, 'interval', hours=2)
import time
from apscheduler.schedulers.blocking import BlockingSchedulerdef job(text):t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))print('{} --- {}'.format(text, t))scheduler = BlockingScheduler()
# 每隔 1分钟 运行一次 job 方法
scheduler.add_job(job, 'interval', minutes=1, args=['job1'])
# 在 2019-08-29 22:15:00至2019-08-29 22:17:00期间,每隔1分30秒 运行一次 job 方法
scheduler.add_job(job, 'interval', minutes=1, seconds=30, start_date='2019-08-29 22:15:00',end_date='2019-08-29 22:17:00', args=['job2'])scheduler.start()'''
运行结果:
job2 --- 2019-08-29 22:15:00
job1 --- 2019-08-29 22:15:46
job2 --- 2019-08-29 22:16:30
job1 --- 2019-08-29 22:16:46
job1 --- 2019-08-29 22:17:46
...余下省略...
'''
cron 调度
- year (int|str) – 4-digit year
- month (int|str) – month (1-12)
- day (int|str) – day of the (1-31)
- week (int|str) – ISO week (1-53)
- day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
- hour (int|str) – hour (0-23)
- minute (int|str) – minute (0-59)
- second (int|str) – second (0-59)
- start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)
- end_date (datetime|str) – latest possible date/time to trigger on (inclusive)
- timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone)
# 6-8,11-12月第三个周五 00:00, 01:00, 02:00, 03:00运行
scheduler.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# 每周一到周五运行 直到2024-05-30 00:00:00
scheduler.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2024-05-30'
也可以用表达式类型,可以用以下方式:
表达式 | 字段 | 描述 |
* | 任何 | 在每个值都触发 |
*/a | 任何 | 每隔 a触发一次 |
a-b | 任何 | 在 a-b区间内任何一个时间触发( a必须小于 b) |
a-b/c | 任何 | 在 a-b区间内每隔 c触发一次 |
xth y | day | 第 x个星期 y触发 |
lastx | day | 最后一个星期 x触发 |
last | day | 一个月中的最后一天触发 |
x,y,z | 任何 | 可以把上面的表达式进行组合 |
示例如下:
from apscheduler.schedulers.blocking import BlockingSchedulerdef tick():passscheduler = BlockingScheduler
scheduler.add_job(tick, "cron", day="4th sun", hour=20, minute=1)
scheduler.start()
import time
from apscheduler.schedulers.blocking import BlockingSchedulerdef job(text):t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))print('{} --- {}'.format(text, t))scheduler = BlockingScheduler()
# 在每天22点,每隔 1分钟 运行一次 job 方法
scheduler.add_job(job, 'cron', hour=22, minute='*/1', args=['job1'])
# 在每天22和23点的25分,运行一次 job 方法
scheduler.add_job(job, 'cron', hour='22-23', minute='25', args=['job2'])
# 在每天 8 点,运行一次 job 方法
scheduler.add_job(job, 'cron', hour='8', args=['job2'])
# 在每天 8 点 20点,各运行一次 job 方法 设置最大运行实例数
scheduler.add_job(job, 'cron', hour='8, 20', minute=30, max_instances=4)scheduler.start()'''
运行结果:
job1 --- 2019-08-29 22:25:00
job2 --- 2019-08-29 22:25:00
job1 --- 2019-08-29 22:26:00
job1 --- 2019-08-29 22:27:00
...余下省略...
'''
任务存储器
任务存储器决定任务的保存方式, 默认存储在内存中(MemoryJobStore),重启后就没有了。不同的任务存储器可以在调度器的配置中进行配置(见调度器)。APScheduler 支持的任务存储器有:
- apscheduler.jobstores.memory:内存
- apscheduler.jobstores.mongodb:存储在 mongodb
- apscheduler.jobstores.redis:存储在 redis
- apscheduler.jobstores.rethinkdb:存储在 rethinkdb
- apscheduler.jobstores.sqlalchemy:支持 sqlalchemy 的数据库如 mysql,sqlite 等
- apscheduler.jobstores.zookeeper:zookeeper
执行器
执行器决定如何执行任务;常用的有 pool(线程/进程) 和 gevent(io多路复用,支持高并发),默认为 pool 中线程池, 不同的执行器可以在调度器的配置中进行配置(见调度器)。执行器的选择取决于应用场景。通常默认的 ThreadPoolExecutor 已经在大部分情况下是可以满足我们需求的。如果我们的任务涉及到一些 CPU密集计算的操作。那么应该考虑 ProcessPoolExecutor。然后针对每种程序, apscheduler也设置了不同的 executor:
- apscheduler.executors.asyncio:同步 io,阻塞
- apscheduler.executors.gevent:io 多路复用,非阻塞
- apscheduler.executors.pool: 线程ThreadPoolExecutor 和 进程ProcessPoolExecutor
- apscheduler.executors.twisted:基于事件驱动
调度器
调度器的主循环其实就是反复检查是不是有到时需要执行的任务,分以下 2 步进行:
- 询问自己的每一个作业存储器,有没有到期需要执行的任务,如果有,计算这些作业中每个作业需要运行的时间点如果时间点有多个,做 coalesce 检查。设置 coalesce 为 False :设置这个目的是,比如由于某个原因导致某个任务积攒了很多次没有执行(比如有一个任务是1分钟跑一次,但是系统原因断了5分钟),如果 coalesce = True ,那么下次恢复运行的时候,会只执行一次,而如果设置 coalesce = False ,那么就不会合并,会5次全部执行。
- 提交给执行器按时间点运行。
在配置调度器前,首先要选取合适应用环境场景的 调度器,存储器 和 执行器。APScheduler 支持的调度器方式如下,比较常用的为 BlockingScheduler 和 BackgroundScheduler,下面是各调度器的适用场景,可以满足绝大多数的应用环境:
- BlockingScheduler:适用于调度程序是进程中唯一运行的进程,调用start函数会阻塞当前线程,不能立即返回。
- BackgroundScheduler:适用于调度程序在应用程序的后台运行,调用 start 后主线程不会阻塞。
- AsyncIOScheduler:适用于使用了 asyncio 模块的应用程序。
- GeventScheduler:适用于使用 gevent 模块的应用程序。
- TwistedScheduler:适用于构建 Twisted 的应用程序。
- QtScheduler:适用于构建 Qt 的应用程序。
调度器可以操作任务(并为任务指定触发器、任务存储器和执行器)和监控任务。
scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)
作业存储器 的选择有两种:
- 一 是内存,也是默认的配置。适用场景:重启整个应用程序后,作业会被重新添加到调度器中,此时简单的选取内存作为作业存储器即简单又高效。
- 二 是 数据库。适用场景:当调度器重启或应用程序崩溃时需要作业从中断位置恢复正常运行,那么可以选择将作业存储在数据库中,至于适用什么数据库,可以自由选择,PostgreSQL 是推荐的选择,因为它具有强大的数据完整性保护。
执行器 的选择也取决于应用场景。
- 通常默认的 ThreadPoolExecutor 已经足够好。
- 如果作业负载涉及CPU 密集型操作,那么应该考虑使用 ProcessPoolExecutor,甚至可以同时使用这两种执行器,将 ProcessPoolExecutor 行器添加为二级执行器。
调度器 的 配置
调度器配置:在 add_job 我们看到 jobstore 和 executor 都是 default,APScheduler 在定义调度器时可以指定不同的任务存储和执行器,以及初始的参数
from pytz import utc
import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor# 通过dict方式执行不同的 job_stores、executors和默认的参数
job_stores = {'mongo': MongoDBJobStore(),'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {'default': ThreadPoolExecutor(20),'process_pool': ProcessPoolExecutor(5)
}
job_defaults = {'coalesce': False,'max_instances': 3
}
# 定义调度器
scheduler = BackgroundScheduler(jobstores=job_stores, executors=executors,job_defaults=job_defaults, timezone=utc
)def job_func(job_id):print(f'job {job_id} is run at {datetime.datetime.now().replace(microsecond=0)}')# 添加任务 scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', jobstore='default', executor='process_pool', seconds=10
)
# 启动调度器
scheduler.start()
操作任务:调度器可以增加,删除,暂停,恢复和修改任务。需要注意的是这里的操作只是对未执行的任务起作用,已经执行和正在执行的任务不受这些操作的影响。
- add_job:添加任务
scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10
)
- remove_job:通过任务唯一的 id,删除的时候对应的任务存储器里记录也会删除
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')
- Pausing and resuming jobs:暂停和重启任务
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.pause_job('my_job_id')
scheduler.resume_job('my_job_id')
- Modifying jobs:修改任务的配置
job = scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id', max_instances=10
)
# 修改任务的属性
job.modify(max_instances=6, name='Alternate name')
# 修改任务的触发器
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
获取作业 (job) 列表。通过 get_jobs() 返回所有 job 实例或者 print_jobs() 输出格式化的作业列表
scheduler.get_jobs()
scheduler.print_jobs()
关闭调度器
默认情况下调度器会等待所有正在运行的作业完成后,关闭所有的调度器和作业存储。如果你不想等待,可以将wait选项设置为False。
scheduler.shutdown()
scheduler.shutdown(wait=False)
监控任务事件类型,比较常用的类型有:
- EVENT_JOB_ERROR: 表示任务在执行过程的出现异常触发
- EVENT_JOB_EXECUTED:任务执行成功时
- EVENT_JOB_MAX_INSTANCES:调度器上执行的任务超过配置的参数时
scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
apscheduler 提供了许多不同的方法来配置调度器。可以使用字典,也可以使用关键字参数传递。首先实例化调度程序,添加作业,然后配置调度器,获得最大的灵活性。
如果调度程序在应用程序的后台运行,选择 BackgroundScheduler,并使用默认的 jobstore 和默认的executor,则以下配置即可:
from apscheduler.schedulers.background import BackgroundSchedulerscheduler = BackgroundScheduler()
假如我们想配置更多信息:设置两个执行器、两个作业存储器、调整新作业的默认值,并设置不同的时区。下述三个方法是完全等同的。
配置需求
- 配置名为 “mongo” 的 MongoDBJobStore 作业存储器
- 配置名为 “default” 的 SQLAlchemyJobStore (使用SQLite)
- 配置名为 “default” 的 ThreadPoolExecutor,最大线程数为20
- 配置名为 “processpool” 的 ProcessPoolExecutor,最大进程数为5
- UTC 作为调度器的时区
- coalesce 默认情况下关闭
- 作业的默认最大运行实例限制为 3
方法 1:
from pytz import utcfrom apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutorjob_stores = {'mongo': MongoDBJobStore(),'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {'default': ThreadPoolExecutor(20),'process_pool': ProcessPoolExecutor(5)
}
job_defaults = {'coalesce': False,'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=job_stores, executors=executors, job_defaults=job_defaults, timezone=utc
)
方法二
from apscheduler.schedulers.background import BackgroundSchedulerscheduler = BackgroundScheduler({'apscheduler.jobstores.mongo': {'type': 'mongodb'},'apscheduler.jobstores.default': {'type': 'sqlalchemy','url': 'sqlite:///jobs.sqlite'},'apscheduler.executors.default': {'class': 'apscheduler.executors.pool:ThreadPoolExecutor','max_workers': '20'},'apscheduler.executors.processpool': {'type': 'processpool','max_workers': '5'},'apscheduler.job_defaults.coalesce': 'false','apscheduler.job_defaults.max_instances': '3','apscheduler.timezone': 'UTC',
})
方法三:
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutorjob_stores = {'mongo': {'type': 'mongodb'},'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {'default': {'type': 'threadpool', 'max_workers': 20},'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {'coalesce': False,'max_instances': 3
}
scheduler = BackgroundScheduler()# .. do something else here, maybe add jobs etc.
以上涵盖了大多数情况的调度器配置,在实际运行时可以试试不同的配置会有怎样不同的效果。
启动 调度器
启动调度器前需要先添加作业,有两种方法向调度器添加作业:
- 一是通过接口 add_job(),
- 二是通过使用 函数装饰器,其中 add_job() 返回一个 apscheduler.job.Job 类 的实例,用于后续修改或删除作业。
我们可以随时在调度器上调度作业。如果在添加作业时,调度器还没有启动,那么任务将不会运行,并且第一次运行时间在调度器启动时计算。
注意:如果使用的是序列化作业的执行器或作业存储器,那么要求被调用的作业(函数)必须是全局可访问的,被调用的作业的参数是可序列化的,作业存储器中,只有 MemoryJobStore 不会序列化作业。执行器中,只有 ProcessPoolExecutor 将序列化作业。
启用调度器只需要调用调度器的 start() 方法
使用 add_job 添加 job
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.blocking import BlockingSchedulerimport datetime
import time
import loggingdef job_function():print("Hello World" + " " + str(datetime.datetime.now()))if __name__ == '__main__':log = logging.getLogger('apscheduler.executors.default')log.setLevel(logging.INFO) # DEBUG fmt = logging.Formatter('%(levelname)s:%(name)s:%(message)s')h = logging.StreamHandler()h.setFormatter(fmt)log.addHandler(h)print('start to do it')sched = BlockingScheduler()# Schedules job_function to be run on the third Friday # of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00 sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour='0-9', minute="*", second="*/4")sched.start()
使用 装饰器 添加 job
使用装饰器来展示一个调度的使用:
from apscheduler.schedulers.blocking import BlockingSchedulerblocking_scheduler = BlockingScheduler()@blocking_scheduler.scheduled_job('interval', seconds=3)
def timed_job():print('This job is run every three minutes.')@blocking_scheduler.scheduled_job('cron', day_of_week='mon-fri', hour='0-9', minute='30-59', second='*/3')
def scheduled_job():print('This job is run every weekday at 5pm.')print('before the start function')
blocking_scheduler.start()
print("let us figure out the situation")
调度器 的 事件监听
如果程序有异常抛出会影响整个调度任务吗?请看下面的代码,运行一下看看会发生什么情况:
# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetimedef aps_test(x):print(1 / 0)print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=('定时任务',), trigger='cron', second='*/5')scheduler.start()
每 5 秒抛出一次报错信息。任何代码都可能抛出异常,关键是,发生导常事件,如何第一时间知道,这才是我们最关心的,apscheduler 已经为我们想到了这些,提供了事件监听来解决这一问题。将上述代码稍做调整,加入日志记录和事件监听,如下所示。
import datetime
import logging
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERRORlogging.basicConfig(level=logging.INFO,format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',datefmt='%Y-%m-%d %H:%M:%S',filename='log1.txt',filemode='a'
)def aps_test(x):print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)def date_test(x):print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)print(1 / 0)def my_listener(event):if event.exception:print('任务出错了!!!!!!')else:print('任务照常运行...')scheduler = BlockingScheduler()
scheduler.add_job(func=date_test, args=('一次性任务,会出错',),next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=15), id='date_task'
)
scheduler.add_job(func=aps_test, args=('循环任务',), trigger='interval', seconds=3, id='interval_task'
)
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
scheduler._logger = loggingscheduler.start()
示例
示例 1 --- 间隔性任务
# -*- coding: utf-8 -*-from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingSchedulerdef task_func():print('Tick! The time is: %s' % datetime.now())if __name__ == '__main__':scheduler = BlockingScheduler()scheduler.add_job(task_func, 'interval', seconds=3)print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C '))try:scheduler.start()except (KeyboardInterrupt, SystemExit):pass
说明:
- 导入调度器模块 BlockingScheduler,这是最简单的调度器,调用 start 方阻塞当前进程,如果你的程序只用于调度,除了调度进程外没有其他后台进程,那么请用 BlockingScheduler 非常有用,此时调度进程相当于守护进程。
- 定义一个函数 task_func 代表我们要调度的作业程序。
- 实例化一个 BlockingScheduler 类,不带参数表明使用默认的作业存储器-内存,默认的执行器是线程池执行器,最大并发线程数默认为 10 个(另一个是进程池执行器)。
- add_job 是添加一个作业 task_func,触发器为 interval,每隔 3 秒执行一次,另外的触发器为 date,cron。date 按特定时间点触发,cron 则按固定的时间间隔触发。加入捕捉用户中断执行和解释器退出异常,pass 关键字,表示什么也不做。
程序执行结果是每 3 秒打印出了当前时间。
示例 2 --- cron 任务
# -*- coding: utf-8 -*-from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingSchedulerdef task_func():print('Tick! The time is: %s' % datetime.now())if __name__ == '__main__':scheduler = BlockingScheduler()scheduler.add_job(task_func, 'cron', hour=19, minute=23)print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C '))try:scheduler.start()except (KeyboardInterrupt, SystemExit):pass
定时 cron 任务也非常简单,直接给触发器 trigger 传入 "cron" 即可。hour =19,minute =23 这里表示每天的 19:23 分 执行任务。这里可以填写数字,也可以填写字符串
示例:设置后台任务
import time
import os
from datetime import datetime
from apscheduler.schedulers.background import BackgroundSchedulerdef tick():print('Tick! The time is: %s' % datetime.now())# 非阻塞,cron类型触发器
scheduler = BackgroundScheduler()
scheduler.add_job(tick, 'cron', day_of_week='mon-fri', hour=9, minute=30)if __name__ == '__main__':# 运行任务调度scheduler.start()print(scheduler.get_jobs())print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))try:while True:time.sleep(5)print('sleep!')except (KeyboardInterrupt, SystemExit):# 关闭调度scheduler.shutdown(wait=False)print('Exit The Job!')
示例 :设置任务监听
import datetime
from loguru import logger
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR# 任务执行函数
def job_func(job_id):print('job %s is runed at %s' % (job_id, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))# 事件监听
def job_exception_listener(event):if event.exception:# todo:异常处理, 告警等 print('The job crashed :(')else:print('The job worked :)')# 日志 # 定义一个后台任务非阻塞调度器
scheduler = BackgroundScheduler()
# 添加一个任务到内存中
# 触发器:trigger='interval' seconds=10 每10s触发执行一次
# 执行器:executor='default' 线程执行
# 任务存储器:jobstore='default' 默认内存存储
# 最大并发数:max_instances
scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10
)
# 设置任务监听
scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
# 启动调度器
scheduler.start()
示例:使用默认的作业存储器:
# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutordef my_job(id='my_job'):print(id, '-->', datetime.datetime.now())job_stores = {'default': MemoryJobStore()}
executors = {'default': ThreadPoolExecutor(20),'processpool': ProcessPoolExecutor(10)
}
job_defaults = {'coalesce': False,'max_instances': 3
}
scheduler = BlockingScheduler(jobstores=job_stores, executors=executors, job_defaults=job_defaults
)
scheduler.add_job(my_job, args=['job_interval', ], id='job_interval', trigger='interval', seconds=5, replace_existing=True
)
scheduler.add_job(my_job, args=['job_cron', ], id='job_cron', trigger='cron', month='4-8,11-12', hour='7-11', second='*/10', end_date='2021-05-30'
)
scheduler.add_job(my_job, args=['job_once_now', ], id='job_once_now')
scheduler.add_job(my_job, args=['job_date_once', ], id='job_date_once', trigger='date', run_date='2018-04-05 07:48:05'
)
try:scheduler.start()
except SystemExit:print('exit')exit()
上述代码使用内存作为作业存储器,操作比较简单,重启程序相当于第一次运行。
示例:使用数据库作为存储器:
# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStoredef my_job(id='my_job'):print(id, '-->', datetime.datetime.now())job_stores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {'default': ThreadPoolExecutor(20),'processpool': ProcessPoolExecutor(10)
}
job_defaults = {'coalesce': False,'max_instances': 3
}scheduler = BlockingScheduler(jobstores=job_stores, executors=executors, job_defaults=job_defaults
)
scheduler.add_job(my_job, args=['job_interval', ], id='job_interval',trigger='interval', seconds=5, replace_existing=True
)
scheduler.add_job(my_job, args=['job_cron', ], id='job_cron',trigger='cron', month='4-8,11-12', hour='7-11', second='*/10',end_date='2018-05-30'
)
scheduler.add_job(my_job, args=['job_once_now', ], id='job_once_now')
scheduler.add_job(my_job, args=['job_date_once', ], id='job_date_once',trigger='date', run_date='2018-04-05 07:48:05'
)
try:scheduler.start()
except SystemExit:print('exit')exit()
运行程序时,提示有作业本应在 2018-04-05 07:48:05 运行的作业没有运行,因为现在的时间为 2018-04-05 08:06:34,错过了 0:18:28 的时间。
如果将过期 job 注释掉,重新运行本程序,则四种类型的作业仍会运行,结果如下:
Run time of job "my_job (trigger: cron[month='4-8,11-12', hour='7-11', second='*/10'], next run at: 2018-04-05 08:14:40 CST)" was missed by 0:00:23.680603
Run time of job "my_job (trigger: cron[month='4-8,11-12', hour='7-11', second='*/10'], next run at: 2018-04-05 08:14:40 CST)" was missed by 0:00:13.681604
Run time of job "my_job (trigger: cron[month='4-8,11-12', hour='7-11', second='*/10'], next run at: 2018-04-05 08:14:40 CST)" was missed by 0:00:03.681604
……
Run time of job "my_job (trigger: interval[0:00:05], next run at: 2018-04-05 08:14:38 CST)" was missed by 0:00:15.687917
Run time of job "my_job (trigger: interval[0:00:05], next run at: 2018-04-05 08:14:38 CST)" was missed by 0:00:10.687917
Run time of job "my_job (trigger: interval[0:00:05], next run at: 2018-04-05 08:14:38 CST)" was missed by 0:00:05.687917
job_interval --> 2018-04-05 08:14:33.821645
job_interval --> 2018-04-05 08:14:38.529167
job_cron --> 2018-04-05 08:14:40.150080
job_interval --> 2018-04-05 08:14:43.296188
job_interval --> 2018-04-05 08:14:48.327317
作业仍会运行,说明作业被添加到数据库中,程序中断后重新运行时会自动从数据库读取作业信息,而不需要重新再添加到调度器中,如果不注释 21-25 行添加作业的代码,则作业会重新添加到数据库中,这样就有了两个同样的作业,避免出现这种情况可以在 add_job 的参数中增加 replace_existing=True,如:scheduler.add_job(my_job, args=['job_interval',],id='job_interval',trigger='interval',seconds=3,replace_existing=True)
如果我们想运行错过运行的作业,使用 misfire_grace_time,如:scheduler.add_job(my_job,args = ['job_cron',] ,id='job_cron',trigger='cron',month='4-8,11-12',hour='7-11',second='*/15',coalesce=True,misfire_grace_time=30,replace_existing=True,end_date='2018-05-30')
说明:misfire_grace_time,假如一个作业本来 08:00 有一次执行,但是由于某种原因没有被调度上,现在 08:01 了,这个 08:00 的运行实例被提交时,会检查它预订运行的时间和当下时间的差值(这里是1分钟),大于我们设置的 30 秒限制,那么这个运行实例不会被执行。最常见的情形是 scheduler 被 shutdown 后重启,某个任务会积攒了好几次没执行如 5 次,下次这个作业被提交给执行器时,执行 5 次。设置 coalesce=True 后,只会执行一次。
其他操作如下:
scheduler.remove_job(job_id,jobstore=None)#删除作业
scheduler.remove_all_jobs(jobstore=None)#删除所有作业
scheduler.pause_job(job_id,jobstore=None)#暂停作业
scheduler.resume_job(job_id,jobstore=None)#恢复作业
scheduler.modify_job(job_id, jobstore=None, **changes)#修改单个作业属性信息
scheduler.reschedule_job(job_id, jobstore=None, trigger=None,**trigger_args)#修改单个作业的触发器并更新下次运行时间
scheduler.print_jobs(jobstore=None, out=sys.stdout)#输出作业信息