Python 定时任务框架 apscheduler

github 地址:https://github.com/agronholm/apscheduler

apscheduler 基本概念介绍

        说到定时任务,会想起 linux 自带的 crontab ,windows 自带的任务计划,都可以实现守时任务。操作系统基本都会提供定时任务的实现,但是如果想要更加精细化的控制,或者说任务程序需要跨平台运行,最好还是自己实现定时任务框架,Python中定时任务的解决方案,总体来说有四种,分别是: crontab、 scheduler、 Celery、 APScheduler,其中 crontab 不适合多台服务器的配置、 scheduler 太过于简单、 Celery 依赖的软件比较多,比较耗资源。最好的解决方案就是 APScheduler。Python 的 apscheduler 提供了非常丰富而且方便易用的定时任务接口。

        apscheduler ( advance python scheduler ) 是 Python 中的任务调度库,使用起来十分方便 。提供了基于 日期、固定时间间隔、crontab 类型的任务,可以在主程序的运行过程中快速增加新作业或删除旧作业,如果把作业存储在数据库中,那么作业的状态会被保存,当调度器重启时,不必重新添加作业,作业会恢复原状态继续执行。apscheduler 可以当作一个跨平台的调度工具来使用,可以做为 linux 系统 crontab 工具或 windows 计划任务程序的替换。注意,apscheduler 不是一个守护进程或服务,它自身不带有任何命令行工具。它主要是要在现有的应用程序中运行,也就是说,apscheduler 为我们提供了构建专用调度器或调度服务的基础模块。

安装:pip install apschedule

调度器的工作流程

APScheduler 组件介绍

APScheduler 由5个部分组成:触发器、调度器、任务存储器、执行器、任务事件

  • 任务 job:任务id 和 任务执行 func
  • 触发器(triggers)确定任务何时开始执行。触发器包含调度逻辑,描述一个任务何时被触发,按日期或按时间间隔或按 cronjob 表达式三种方式触发。每个作业都有它自己的触发器,除了初始配置之外,触发器是完全无状态的。
  • 作业存储器(job stores): 也叫 "任务存储器",保存任务的状态。作业存储器指定了作业被存放的位置,默认情况下作业保存在内存,也可将作业保存在各种数据库中,当作业被存放在数据库中时,它会被序列化,当被重新加载时会反序列化。作业存储器充当保存、加载、更新和查找作业的中间商。在调度器之间不能共享作业存储。
  • 执行器(executors)确定任务怎么执行。执行器是将指定的作业(调用函数)提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件。
  • 调度器(schedulers):任务调度器,属于控制角色,通过它配置作业存储器、执行器和触发器,添加、修改和删除任务。调度器协调触发器、作业存储器、执行器的运行,通常只有一个调度程序运行在应用程序中,开发人员通常不需要直接处理作业存储器、执行器或触发器,配置作业存储器和执行器是通过调度器来完成的,调度器会自动完成。调度器串联任务的整个生命周期,添加编辑任务到任务存储器,在任务的执行时间到来时,把任务交给执行器执行返回结果;同时发出事件监听,监控任务事件 。
  • 任务事件 event:监控任务执行异常情况

 

触发器

触发器决定何时执行任务,APScheduler 支持的触发器有3种

  •  trigger='interval':按固定时间周期执行,支持weeks,days,hours,minutes, seconds, 还可指定时间范围:sched.add_job(job_function, 'interval', hours=2, start_date='2010-10-10 09:30:00', end_date='2014-06-15 11:00:00') 
  • trigger='date':固定时间,执行一次。sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text']) 
  • trigger='cron': 支持 crontab 方式执行任务。也可指定时间范围。
            year (int|str) – 4-digit year  
            month (int|str) – month (1-12)  
            day (int|str) – day of the (1-31)  
            week (int|str) – ISO week (1-53)  
            day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)  
            hour (int|str) – hour (0-23)  
            minute (int|str) – minute (0-59)  
            second (int|str) – second (0-59)  
            start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)  
            end_date (datetime|str) – latest possible date/time to trigger on (inclusive) 
            
            # 星期一到星期五,5点30执行任务 job_function,直到2014-05-30 00:00:00  
            sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')  
            # 按照crontab格式执行, 格式为:分钟 小时 天 月 周,*表示所有  
            # 5月到8月的1号到15号,0点0分执行任务job_function  
            sched.add_job(job_function, CronTrigger.from_crontab('0 0 1-15 may-aug *')) 

触发器 的 参数

date 定时,作业只执行一次。

  • run_date (datetime|str) – the date/time to run the job at
  • timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already
sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])
# 2019年7月6号16时30分5s执行
sched.add_job(my_job, 'date', run_date=datetime(2019, 7, 6, 16, 30, 5), args=['text'])
from datetime import datetime
from datetime import date
from apscheduler.schedulers.blocking import BlockingSchedulerdef job(text):print(text)scheduler = BlockingScheduler()
# 在 2019-8-30 运行一次 job 方法
scheduler.add_job(job, 'date', run_date=date(2019, 8, 30), args=['text1'])
# 在 2019-8-30 01:00:00 运行一次 job 方法
scheduler.add_job(job, 'date', run_date=datetime(2019, 8, 30, 1, 0, 0), args=['text2'])
# 在 2019-8-30 01:00:01 运行一次 job 方法
scheduler.add_job(job, 'date', run_date='2019-8-30 01:00:00', args=['text3'])scheduler.start()

interval 间隔调度

  • weeks (int) – number of weeks to wait
  • days (int) – number of days to wait
  • hours (int) – number of hours to wait
  • minutes (int) – number of minutes to wait
  • seconds (int) – number of seconds to wait
  • start_date (datetime|str) – starting point for the interval calculation
  • end_date (datetime|str) – latest possible date/time to trigger on
  • timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations
scheduler.add_job(job_function, 'interval', hours=2)
import time
from apscheduler.schedulers.blocking import BlockingSchedulerdef job(text):t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))print('{} --- {}'.format(text, t))scheduler = BlockingScheduler()
# 每隔 1分钟 运行一次 job 方法
scheduler.add_job(job, 'interval', minutes=1, args=['job1'])
# 在 2019-08-29 22:15:00至2019-08-29 22:17:00期间,每隔1分30秒 运行一次 job 方法
scheduler.add_job(job, 'interval', minutes=1, seconds=30, start_date='2019-08-29 22:15:00',end_date='2019-08-29 22:17:00', args=['job2'])scheduler.start()'''
运行结果:
job2 --- 2019-08-29 22:15:00
job1 --- 2019-08-29 22:15:46
job2 --- 2019-08-29 22:16:30
job1 --- 2019-08-29 22:16:46
job1 --- 2019-08-29 22:17:46
...余下省略...
'''

cron 调度

  • year (int|str) – 4-digit year
  • month (int|str) – month (1-12)
  • day (int|str) – day of the (1-31)
  • week (int|str) – ISO week (1-53)
  • day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
  • hour (int|str) – hour (0-23)
  • minute (int|str) – minute (0-59)
  • second (int|str) – second (0-59)
  • start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)
  • end_date (datetime|str) – latest possible date/time to trigger on (inclusive)
  • timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone)
# 6-8,11-12月第三个周五 00:00, 01:00, 02:00, 03:00运行
scheduler.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# 每周一到周五运行 直到2024-05-30 00:00:00
scheduler.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2024-05-30'

也可以用表达式类型,可以用以下方式:

表达式字段描述
*任何在每个值都触发
*/a任何每隔 a触发一次
a-b任何在 a-b区间内任何一个时间触发( a必须小于 b)
a-b/c任何在 a-b区间内每隔 c触发一次
xth yday第 x个星期 y触发
lastxday最后一个星期 x触发
lastday一个月中的最后一天触发
x,y,z任何可以把上面的表达式进行组合

示例如下:

from apscheduler.schedulers.blocking import BlockingSchedulerdef tick():passscheduler = BlockingScheduler
scheduler.add_job(tick, "cron", day="4th sun", hour=20, minute=1)
scheduler.start()
import time
from apscheduler.schedulers.blocking import BlockingSchedulerdef job(text):t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))print('{} --- {}'.format(text, t))scheduler = BlockingScheduler()
# 在每天22点,每隔 1分钟 运行一次 job 方法
scheduler.add_job(job, 'cron', hour=22, minute='*/1', args=['job1'])
# 在每天22和23点的25分,运行一次 job 方法
scheduler.add_job(job, 'cron', hour='22-23', minute='25', args=['job2'])
# 在每天 8 点,运行一次 job 方法
scheduler.add_job(job, 'cron', hour='8', args=['job2'])
# 在每天 8 点 20点,各运行一次 job 方法    设置最大运行实例数
scheduler.add_job(job, 'cron', hour='8, 20', minute=30, max_instances=4)scheduler.start()'''
运行结果:
job1 --- 2019-08-29 22:25:00
job2 --- 2019-08-29 22:25:00
job1 --- 2019-08-29 22:26:00
job1 --- 2019-08-29 22:27:00
...余下省略...
'''

任务存储器

任务存储器决定任务的保存方式, 默认存储在内存中(MemoryJobStore),重启后就没有了。不同的任务存储器可以在调度器的配置中进行配置(见调度器)。APScheduler 支持的任务存储器有:

  •  apscheduler.jobstores.memory:内存
  •  apscheduler.jobstores.mongodb:存储在 mongodb
  •  apscheduler.jobstores.redis:存储在 redis
  •  apscheduler.jobstores.rethinkdb:存储在 rethinkdb
  •  apscheduler.jobstores.sqlalchemy:支持 sqlalchemy 的数据库如 mysql,sqlite 等
  •  apscheduler.jobstores.zookeeper:zookeeper

执行器  

执行器决定如何执行任务;常用的有 pool(线程/进程) 和 gevent(io多路复用,支持高并发),默认为 pool 中线程池, 不同的执行器可以在调度器的配置中进行配置(见调度器)。执行器的选择取决于应用场景。通常默认的 ThreadPoolExecutor 已经在大部分情况下是可以满足我们需求的。如果我们的任务涉及到一些 CPU密集计算的操作。那么应该考虑 ProcessPoolExecutor。然后针对每种程序, apscheduler也设置了不同的 executor:

  •  apscheduler.executors.asyncio:同步 io,阻塞
  •  apscheduler.executors.gevent:io 多路复用,非阻塞
  •  apscheduler.executors.pool:  线程ThreadPoolExecutor进程ProcessPoolExecutor
  •  apscheduler.executors.twisted:基于事件驱动

调度器

调度器的主循环其实就是反复检查是不是有到时需要执行的任务,分以下 2 步进行:

  • 询问自己的每一个作业存储器,有没有到期需要执行的任务,如果有,计算这些作业中每个作业需要运行的时间点如果时间点有多个,做 coalesce 检查。设置 coalesce 为 False :设置这个目的是,比如由于某个原因导致某个任务积攒了很多次没有执行(比如有一个任务是1分钟跑一次,但是系统原因断了5分钟),如果 coalesce = True ,那么下次恢复运行的时候,会只执行一次,而如果设置 coalesce = False ,那么就不会合并,会5次全部执行。
  • 提交给执行器按时间点运行。

在配置调度器前,首先要选取合适应用环境场景的 调度器,存储器 执行器。APScheduler 支持的调度器方式如下,比较常用的为 BlockingScheduler 和 BackgroundScheduler,下面是各调度器的适用场景,可以满足绝大多数的应用环境:

  • BlockingScheduler:适用于调度程序是进程中唯一运行的进程,调用start函数会阻塞当前线程,不能立即返回。
  • BackgroundScheduler:适用于调度程序在应用程序的后台运行,调用 start 后主线程不会阻塞。
  •  AsyncIOScheduler:适用于使用了 asyncio 模块的应用程序。
  •  GeventScheduler:适用于使用 gevent 模块的应用程序。
  •  TwistedScheduler:适用于构建 Twisted 的应用程序。
  •  QtScheduler:适用于构建 Qt 的应用程序。

调度器可以操作任务(并为任务指定触发器、任务存储器和执行器)和监控任务。

scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)

作业存储器 的选择有两种:

  • 一 是内存,也是默认的配置适用场景:重启整个应用程序后,作业会被重新添加到调度器中,此时简单的选取内存作为作业存储器即简单又高效。
  • 二 是 数据库适用场景:当调度器重启或应用程序崩溃时需要作业从中断位置恢复正常运行,那么可以选择将作业存储在数据库中,至于适用什么数据库,可以自由选择,PostgreSQL 是推荐的选择,因为它具有强大的数据完整性保护。

执行器 的选择也取决于应用场景。

  • 通常默认的 ThreadPoolExecutor 已经足够好。
  • 如果作业负载涉及CPU 密集型操作,那么应该考虑使用 ProcessPoolExecutor,甚至可以同时使用这两种执行器,将 ProcessPoolExecutor 行器添加为二级执行器。

调度器 的 配置

调度器配置:在 add_job 我们看到 jobstore 和 executor 都是 default,APScheduler 在定义调度器时可以指定不同的任务存储和执行器,以及初始的参数   

from pytz import utc
import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor# 通过dict方式执行不同的 job_stores、executors和默认的参数  
job_stores = {'mongo': MongoDBJobStore(),'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {'default': ThreadPoolExecutor(20),'process_pool': ProcessPoolExecutor(5)
}
job_defaults = {'coalesce': False,'max_instances': 3
}
# 定义调度器  
scheduler = BackgroundScheduler(jobstores=job_stores, executors=executors,job_defaults=job_defaults, timezone=utc
)def job_func(job_id):print(f'job {job_id} is run at {datetime.datetime.now().replace(microsecond=0)}')# 添加任务  scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', jobstore='default', executor='process_pool', seconds=10
)
# 启动调度器  
scheduler.start()

操作任务:调度器可以增加,删除,暂停,恢复和修改任务。需要注意的是这里的操作只是对未执行的任务起作用,已经执行和正在执行的任务不受这些操作的影响。

  •   add_job:添加任务
scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10
)
  • remove_job:通过任务唯一的 id,删除的时候对应的任务存储器里记录也会删除 
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')  
scheduler.remove_job('my_job_id') 
  • Pausing and resuming jobs:暂停和重启任务       
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')  
scheduler.pause_job('my_job_id')  
scheduler.resume_job('my_job_id') 
  • Modifying jobs:修改任务的配置       
job = scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id', max_instances=10
)  
# 修改任务的属性  
job.modify(max_instances=6, name='Alternate name')  
# 修改任务的触发器  
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5') 

获取作业 (job) 列表。通过 get_jobs() 返回所有 job 实例或者 print_jobs() 输出格式化的作业列表

scheduler.get_jobs()
scheduler.print_jobs()

关闭调度器

默认情况下调度器会等待所有正在运行的作业完成后,关闭所有的调度器和作业存储。如果你不想等待,可以将wait选项设置为False。

scheduler.shutdown() 
scheduler.shutdown(wait=False)

监控任务事件类型,比较常用的类型有:

  •   EVENT_JOB_ERROR: 表示任务在执行过程的出现异常触发
  •   EVENT_JOB_EXECUTED:任务执行成功时
  •   EVENT_JOB_MAX_INSTANCES:调度器上执行的任务超过配置的参数时
scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

apscheduler 提供了许多不同的方法来配置调度器。可以使用字典,也可以使用关键字参数传递。首先实例化调度程序,添加作业,然后配置调度器,获得最大的灵活性。

如果调度程序在应用程序的后台运行,选择 BackgroundScheduler,并使用默认的 jobstore 和默认的executor,则以下配置即可:

from apscheduler.schedulers.background import BackgroundSchedulerscheduler = BackgroundScheduler()

假如我们想配置更多信息:设置两个执行器、两个作业存储器、调整新作业的默认值,并设置不同的时区。下述三个方法是完全等同的。
配置需求

  • 配置名为 “mongo” 的 MongoDBJobStore 作业存储器
  • 配置名为 “default” 的 SQLAlchemyJobStore (使用SQLite)
  • 配置名为 “default” 的 ThreadPoolExecutor,最大线程数为20
  • 配置名为 “processpool” 的 ProcessPoolExecutor,最大进程数为5
  • UTC 作为调度器的时区
  • coalesce 默认情况下关闭
  • 作业的默认最大运行实例限制为 3

方法 1:

from pytz import utcfrom apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutorjob_stores = {'mongo': MongoDBJobStore(),'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {'default': ThreadPoolExecutor(20),'process_pool': ProcessPoolExecutor(5)
}
job_defaults = {'coalesce': False,'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=job_stores, executors=executors, job_defaults=job_defaults, timezone=utc
)

方法二

from apscheduler.schedulers.background import BackgroundSchedulerscheduler = BackgroundScheduler({'apscheduler.jobstores.mongo': {'type': 'mongodb'},'apscheduler.jobstores.default': {'type': 'sqlalchemy','url': 'sqlite:///jobs.sqlite'},'apscheduler.executors.default': {'class': 'apscheduler.executors.pool:ThreadPoolExecutor','max_workers': '20'},'apscheduler.executors.processpool': {'type': 'processpool','max_workers': '5'},'apscheduler.job_defaults.coalesce': 'false','apscheduler.job_defaults.max_instances': '3','apscheduler.timezone': 'UTC',
})

方法三:

from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutorjob_stores = {'mongo': {'type': 'mongodb'},'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {'default': {'type': 'threadpool', 'max_workers': 20},'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {'coalesce': False,'max_instances': 3
}
scheduler = BackgroundScheduler()# .. do something else here, maybe add jobs etc.

以上涵盖了大多数情况的调度器配置,在实际运行时可以试试不同的配置会有怎样不同的效果。

启动 调度器

启动调度器前需要先添加作业,有两种方法向调度器添加作业:

  • 一是通过接口 add_job(),
  • 二是通过使用 函数装饰器,其中 add_job() 返回一个 apscheduler.job.Job 类 的实例,用于后续修改或删除作业。

我们可以随时在调度器上调度作业。如果在添加作业时,调度器还没有启动,那么任务将不会运行,并且第一次运行时间在调度器启动时计算。

注意:如果使用的是序列化作业的执行器或作业存储器,那么要求被调用的作业(函数)必须是全局可访问的,被调用的作业的参数是可序列化的,作业存储器中,只有 MemoryJobStore 不会序列化作业。执行器中,只有 ProcessPoolExecutor 将序列化作业。

启用调度器只需要调用调度器的 start() 方法

使用 add_job 添加 job

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.blocking import BlockingSchedulerimport datetime
import time
import loggingdef job_function():print("Hello World" + " " + str(datetime.datetime.now()))if __name__ == '__main__':log = logging.getLogger('apscheduler.executors.default')log.setLevel(logging.INFO)  # DEBUG fmt = logging.Formatter('%(levelname)s:%(name)s:%(message)s')h = logging.StreamHandler()h.setFormatter(fmt)log.addHandler(h)print('start to do it')sched = BlockingScheduler()# Schedules job_function to be run on the third Friday # of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00 sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour='0-9', minute="*", second="*/4")sched.start()

使用 装饰器 添加 job

使用装饰器来展示一个调度的使用:

from apscheduler.schedulers.blocking import BlockingSchedulerblocking_scheduler = BlockingScheduler()@blocking_scheduler.scheduled_job('interval', seconds=3)
def timed_job():print('This job is run every three minutes.')@blocking_scheduler.scheduled_job('cron', day_of_week='mon-fri', hour='0-9', minute='30-59', second='*/3')
def scheduled_job():print('This job is run every weekday at 5pm.')print('before the start function')
blocking_scheduler.start()
print("let us figure out the situation")

调度器 的 事件监听

如果程序有异常抛出会影响整个调度任务吗?请看下面的代码,运行一下看看会发生什么情况:

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetimedef aps_test(x):print(1 / 0)print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=('定时任务',), trigger='cron', second='*/5')scheduler.start()

每 5 秒抛出一次报错信息。任何代码都可能抛出异常,关键是,发生导常事件,如何第一时间知道,这才是我们最关心的,apscheduler 已经为我们想到了这些,提供了事件监听来解决这一问题。将上述代码稍做调整,加入日志记录和事件监听,如下所示。

import datetime
import logging
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERRORlogging.basicConfig(level=logging.INFO,format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',datefmt='%Y-%m-%d %H:%M:%S',filename='log1.txt',filemode='a'
)def aps_test(x):print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)def date_test(x):print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)print(1 / 0)def my_listener(event):if event.exception:print('任务出错了!!!!!!')else:print('任务照常运行...')scheduler = BlockingScheduler()
scheduler.add_job(func=date_test, args=('一次性任务,会出错',),next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=15), id='date_task'
)
scheduler.add_job(func=aps_test, args=('循环任务',), trigger='interval', seconds=3, id='interval_task'
)
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
scheduler._logger = loggingscheduler.start()

示例

示例 1 --- 间隔性任务 

# -*- coding: utf-8 -*-from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingSchedulerdef task_func():print('Tick! The time is: %s' % datetime.now())if __name__ == '__main__':scheduler = BlockingScheduler()scheduler.add_job(task_func, 'interval', seconds=3)print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C    '))try:scheduler.start()except (KeyboardInterrupt, SystemExit):pass

说明:

  • 导入调度器模块 BlockingScheduler,这是最简单的调度器,调用 start 方阻塞当前进程,如果你的程序只用于调度,除了调度进程外没有其他后台进程,那么请用 BlockingScheduler 非常有用,此时调度进程相当于守护进程。
  • 定义一个函数 task_func 代表我们要调度的作业程序。
  • 实例化一个 BlockingScheduler 类,不带参数表明使用默认的作业存储器-内存,默认的执行器是线程池执行器,最大并发线程数默认为 10 个(另一个是进程池执行器)。
  • add_job 是添加一个作业 task_func,触发器为 interval,每隔 3 秒执行一次,另外的触发器为 date,cron。date 按特定时间点触发,cron 则按固定的时间间隔触发。加入捕捉用户中断执行和解释器退出异常,pass 关键字,表示什么也不做。

程序执行结果是每 3 秒打印出了当前时间。

示例 2 --- cron 任务

# -*- coding: utf-8 -*-from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingSchedulerdef task_func():print('Tick! The time is: %s' % datetime.now())if __name__ == '__main__':scheduler = BlockingScheduler()scheduler.add_job(task_func, 'cron', hour=19, minute=23)print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C    '))try:scheduler.start()except (KeyboardInterrupt, SystemExit):pass

定时 cron 任务也非常简单,直接给触发器 trigger 传入 "cron" 即可。hour =19,minute =23 这里表示每天的 19:23 分 执行任务。这里可以填写数字,也可以填写字符串

示例:设置后台任务

import time
import os
from datetime import datetime
from apscheduler.schedulers.background import BackgroundSchedulerdef tick():print('Tick! The time is: %s' % datetime.now())# 非阻塞,cron类型触发器
scheduler = BackgroundScheduler()
scheduler.add_job(tick, 'cron', day_of_week='mon-fri', hour=9, minute=30)if __name__ == '__main__':# 运行任务调度scheduler.start()print(scheduler.get_jobs())print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))try:while True:time.sleep(5)print('sleep!')except (KeyboardInterrupt, SystemExit):# 关闭调度scheduler.shutdown(wait=False)print('Exit The Job!')

示例 :设置任务监听


import datetime
from loguru import logger
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR# 任务执行函数  
def job_func(job_id):print('job %s is runed at %s' % (job_id, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))# 事件监听  
def job_exception_listener(event):if event.exception:# todo:异常处理, 告警等  print('The job crashed :(')else:print('The job worked :)')# 日志  # 定义一个后台任务非阻塞调度器  
scheduler = BackgroundScheduler()
# 添加一个任务到内存中   
# 触发器:trigger='interval' seconds=10 每10s触发执行一次  
# 执行器:executor='default' 线程执行  
# 任务存储器:jobstore='default' 默认内存存储  
# 最大并发数:max_instances  
scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10
)
# 设置任务监听  
scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
# 启动调度器  
scheduler.start()

示例:使用默认的作业存储器:

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutordef my_job(id='my_job'):print(id, '-->', datetime.datetime.now())job_stores = {'default': MemoryJobStore()}
executors = {'default': ThreadPoolExecutor(20),'processpool': ProcessPoolExecutor(10)
}
job_defaults = {'coalesce': False,'max_instances': 3
}
scheduler = BlockingScheduler(jobstores=job_stores, executors=executors, job_defaults=job_defaults
)
scheduler.add_job(my_job, args=['job_interval', ], id='job_interval', trigger='interval', seconds=5, replace_existing=True
)
scheduler.add_job(my_job, args=['job_cron', ], id='job_cron', trigger='cron', month='4-8,11-12', hour='7-11', second='*/10', end_date='2021-05-30'
)
scheduler.add_job(my_job, args=['job_once_now', ], id='job_once_now')
scheduler.add_job(my_job, args=['job_date_once', ], id='job_date_once', trigger='date', run_date='2018-04-05 07:48:05'
)
try:scheduler.start()
except SystemExit:print('exit')exit()

上述代码使用内存作为作业存储器,操作比较简单,重启程序相当于第一次运行。

示例:使用数据库作为存储器:

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStoredef my_job(id='my_job'):print(id, '-->', datetime.datetime.now())job_stores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {'default': ThreadPoolExecutor(20),'processpool': ProcessPoolExecutor(10)
}
job_defaults = {'coalesce': False,'max_instances': 3
}scheduler = BlockingScheduler(jobstores=job_stores, executors=executors, job_defaults=job_defaults
)
scheduler.add_job(my_job, args=['job_interval', ], id='job_interval',trigger='interval', seconds=5, replace_existing=True
)
scheduler.add_job(my_job, args=['job_cron', ], id='job_cron',trigger='cron', month='4-8,11-12', hour='7-11', second='*/10',end_date='2018-05-30'
)
scheduler.add_job(my_job, args=['job_once_now', ], id='job_once_now')
scheduler.add_job(my_job, args=['job_date_once', ], id='job_date_once',trigger='date', run_date='2018-04-05 07:48:05'
)
try:scheduler.start()
except SystemExit:print('exit')exit()

运行程序时,提示有作业本应在 2018-04-05 07:48:05 运行的作业没有运行,因为现在的时间为 2018-04-05 08:06:34,错过了 0:18:28 的时间。
如果将过期 job 注释掉,重新运行本程序,则四种类型的作业仍会运行,结果如下:

Run time of job "my_job (trigger: cron[month='4-8,11-12', hour='7-11', second='*/10'], next run at: 2018-04-05 08:14:40 CST)" was missed by 0:00:23.680603
Run time of job "my_job (trigger: cron[month='4-8,11-12', hour='7-11', second='*/10'], next run at: 2018-04-05 08:14:40 CST)" was missed by 0:00:13.681604
Run time of job "my_job (trigger: cron[month='4-8,11-12', hour='7-11', second='*/10'], next run at: 2018-04-05 08:14:40 CST)" was missed by 0:00:03.681604
……
Run time of job "my_job (trigger: interval[0:00:05], next run at: 2018-04-05 08:14:38 CST)" was missed by 0:00:15.687917
Run time of job "my_job (trigger: interval[0:00:05], next run at: 2018-04-05 08:14:38 CST)" was missed by 0:00:10.687917
Run time of job "my_job (trigger: interval[0:00:05], next run at: 2018-04-05 08:14:38 CST)" was missed by 0:00:05.687917
job_interval --> 2018-04-05 08:14:33.821645
job_interval --> 2018-04-05 08:14:38.529167
job_cron --> 2018-04-05 08:14:40.150080
job_interval --> 2018-04-05 08:14:43.296188
job_interval --> 2018-04-05 08:14:48.327317

作业仍会运行,说明作业被添加到数据库中,程序中断后重新运行时会自动从数据库读取作业信息,而不需要重新再添加到调度器中,如果不注释 21-25 行添加作业的代码,则作业会重新添加到数据库中,这样就有了两个同样的作业,避免出现这种情况可以在 add_job 的参数中增加 replace_existing=True,如:scheduler.add_job(my_job, args=['job_interval',],id='job_interval',trigger='interval',seconds=3,replace_existing=True)

如果我们想运行错过运行的作业,使用 misfire_grace_time,如:scheduler.add_job(my_job,args = ['job_cron',] ,id='job_cron',trigger='cron',month='4-8,11-12',hour='7-11',second='*/15',coalesce=True,misfire_grace_time=30,replace_existing=True,end_date='2018-05-30')

说明:misfire_grace_time,假如一个作业本来 08:00 有一次执行,但是由于某种原因没有被调度上,现在 08:01 了,这个 08:00 的运行实例被提交时,会检查它预订运行的时间和当下时间的差值(这里是1分钟),大于我们设置的 30 秒限制,那么这个运行实例不会被执行。最常见的情形是 scheduler 被 shutdown 后重启,某个任务会积攒了好几次没执行如 5 次,下次这个作业被提交给执行器时,执行 5 次。设置 coalesce=True 后,只会执行一次。
其他操作如下:

scheduler.remove_job(job_id,jobstore=None)#删除作业
scheduler.remove_all_jobs(jobstore=None)#删除所有作业
scheduler.pause_job(job_id,jobstore=None)#暂停作业
scheduler.resume_job(job_id,jobstore=None)#恢复作业
scheduler.modify_job(job_id, jobstore=None, **changes)#修改单个作业属性信息
scheduler.reschedule_job(job_id, jobstore=None, trigger=None,**trigger_args)#修改单个作业的触发器并更新下次运行时间
scheduler.print_jobs(jobstore=None, out=sys.stdout)#输出作业信息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/495150.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一文看懂人工智能产业链,未来10年2000亿美元市场

来源:传感器技术摘要:据腾讯研究院统计,截至2017年6月,全球人工智能初创企业共计2617家。美国占据1078家居首,中国以592家企业排名第二,其后分别是英国,以色列,加拿大等国家。根据艾…

如何跟机器人“抢”工作?专家:新的分工将形成

来源:经济日报摘要:随着人工智能技术的深度发展和机器人的广泛应用,人们会从许多传统生产活动中解放出来,有了更多闲暇时间,更强大的支持手段,让生活更有趣和丰富多彩。创新、创意会成为生活和工作中的必需…

互联网让我们变笨了吗:过去10年关于大脑的11个有趣发现

来源:资本实验室摘要:人类大脑,长期以来被认为科学和宇宙中最复杂的事物之一。鉴于其复杂性,受制于技术限制,过去科学家很难解开其内部运作的秘密,但目前的研究成果表明我们离秘密又近了一些。聚焦前沿科技…

美研究人员公布“盲动”机器人技术细节

来源:新华网摘要:7月7日美国麻省理工学院近日发布公报称,该校研究人员最新公布了一种“盲动”机器人的技术细节。这种机器人不需要借助视觉系统,可在崎岖地形中穿行跳跃,有望在危险工作环境中得…

AutoJs 4.1.1 实战教程

Auto.js 中文文档:https://hyb1996.github.io/AutoJs-Docs/#/?id综述 pro 版本支持 Node.js AutoJs Pro 7.0.4-1 实战教程---史上最全快手、抖音极速版 :https://blog.csdn.net/zy0412326/article/details/107180887/:https://blog.csdn.n…

人工智能军备竞赛:一文尽览全球主要国家AI战略

来源:网络大数据摘要:人工智能的迅速发展将深刻改变人类社会和世界的面貌,为了抓住 AI 发展的战略机遇,越来越多的国家和组织已争相开始制定国家层面的发展规划。人工智能的迅速发展将深刻改变人类社会和世界的面貌,为…

flex和js进行参数传递

来着&#xff1a;http://www.cnblogs.com/Cnol/archive/2009/09/20/1570365.html 方法一&#xff1a;flex接收网页传值&#xff01;~ 1<?xml version"1.0" encoding"utf-8"?> 2<mx:Application xmlns:mx"http://www.adobe.com/2006/mxml&q…

师法自然,仿生技术是如何改变世界的?

来源&#xff1a;36Kr摘要&#xff1a;“向自然学习”&#xff0c;这并非是句空话。本文介绍了科学家如何借鉴大自然&#xff0c;在材料科学&#xff0c;信息技术等领域实现创新。希望能为您带来启发。当今世界最伟大的创新者&#xff0c;非大自然莫属。大自然经过45亿年的演变…

Auto.JS 开发

From&#xff1a;https://blog.csdn.net/a6892255/article/details/107302369 autojs 代码大全(实战演练)&#xff1a;https://blog.csdn.net/qq_30931547/article/details/106459765 &#xff1a;https://github.com/snailuncle/autojsCommonFunctions/blob/master/autojsCo…

【研究】大脑如何在“知道”与“无知”之间做出决定

来源&#xff1a;中国生物技术网摘要&#xff1a;我们时而会对“求知欲”如饥似渴&#xff0c;时而又会觉得“无知是福”而享受放空&#xff0c;那么问题来了&#xff0c;在特定的时间里&#xff0c;我们是如何在这两种心态之间进行选择的呢&#xff1f;英国伦敦大学学院(UCL)的…

js 逆向分析的神器 --- v_jstools

From&#xff1a;https://mp.weixin.qq.com/s/LisYhDKK_6ddF-19m1gvzg 1、下载和安装插件 这是一款浏览器插件&#xff0c;功能非常的nice 工具地址&#xff1a;https://github.com/cilame/v_jstools 浏览器打开上面的网站后&#xff0c;点击 code 按钮&#xff0c;选择 Down…

《中国人工智能开源软件发展白皮书(2018)》(附下载及解读PPT)

来源&#xff1a;走向智能论坛摘要&#xff1a;近日&#xff0c;中国人工智能开源软件发展联盟召集中国电子技术标准化研究院等企事业单位&#xff0c;编撰并正式发布《中国人工智能开源软件发展白皮书&#xff08;2018&#xff09;》&#xff0c;白皮书研究梳理人工智能开源软…

把 charles,Fiddler 证书安装到安卓根目录,解决安卓微信 7.0 版本以后安装证书也无法抓包问题,需要 root

From&#xff1a;https://testerhome.com/topics/21956 OpenSSL &#xff1a;https://slproweb.com/products/Win32OpenSSL.html 谷歌在安卓7.0修改了安全策略&#xff0c;安卓系统 大于 7.0 时&#xff0c; 应用不在信任用户安装的证书文件。用户添加的 CA 证书不能再用于安全…

科学家发现跨越生命的重要门槛或许没那么难

来源&#xff1a;中国科学报将团藻&#xff08;拥有数百个细胞的藻类&#xff09;与其相对简单的亲缘物种——单细胞衣藻&#xff08;左上&#xff09;和拥有4~16个细胞的盘藻&#xff08;右上&#xff09;作对比&#xff0c;揭示了向多细胞生命发展的步骤。数十亿年前&#xf…

windows 的 wsl 命令

​wsl 文档&#xff1a;https://docs.microsoft.com/zh-cn/windows/wsl/ From &#xff1a;https://blog.csdn.net/weixin_34101784/article/details/88729575 From &#xff1a;https://www.cnblogs.com/Flat-White/p/13501639.html 玩转 WLS&#xff1a;Windows 10 Ubuntu子系…

人机工程学/人因工程学的定义

来源&#xff1a;人机与认知实验室摘要&#xff1a;人机工程学&#xff08;Human Machine Environment&#xff09;和人因工程学&#xff08;Human Factor Environment&#xff09;国际百科全书的标题表明&#xff0c;人机工程学和人因工程学可能是两个独立的学科领域。人机工程…

Google Pixel 解锁BL、刷入Twrp、magisk Root、安装 Xposed

Google Pixel 解锁 BL、刷入Twrp及Root &#xff1a;http://www.itfanr.cc/2018/10/16/google-pixel-unlock-bl-and-root/ Pixel 安装 Xposed 框架&#xff1a;https://blog.csdn.net/someby/article/details/110388712 自己动手刷 pixel 镜像&#xff08; 原生镜像、自己编译…

全球人脸识别精度一年提高75.6%,拉动全球安防市场超高增长

来源&#xff1a;AI 科技评论摘要&#xff1a;有「工业界黄金标准」之称的美国国家标准与技术研究院&#xff08;National Institute of Standards and Technology&#xff0c;NIST&#xff09;最近公布了全球人脸识别算法测试&#xff08;FRVT&#xff09;结果 FRVT 2018&…

Android 手机的高级终端 Termux 安装、使用

From&#xff1a;https://www.sqlsec.com/2018/05/termux.html Termux 高级终端安装使用配置教程 &#xff1a;https://www.cnblogs.com/cutesnow/p/11430833.html 神器Termux 的使用记录&#xff1a;https://cloud.tencent.com/developer/article/1609398 adb shell 下使用 te…

#ifdef,#ifndef,#define,#endif解析(原)

我们在看一些开源的源代码的时候&#xff0c;经常会看到如下情景&#xff1a; # if defined(_PTHREADS) && !defined(_NOTHREADS) # define __STL_PTHREADS # endif # if defined(_UITHREADS) && !defined(_PTHREADS) && !defined(_NOTHREADS) # …