本文翻译整理自:https://apscheduler.readthedocs.io/en/3.x/userguide.html
文章目录
- 一、安装 APScheduler
- 二、代码示例
- 三、基本概念
- 四、选择合适的 scheduler, job store(s), executor(s) and trigger(s)
- 五、配置调度器
- 方法 1:
- 方法 2:
- 方法 3:
- 六、启动调度器
- 七、添加任务
- 八、移除作业
- 九、暂停和恢复作业
- 十、获取计划任务列表
- 十一、修改 jobs
- 十二、关闭调度器
- 十三、暂停/恢复作业处理
- 十四、限制同一时间 执行的工作实例数量
- 十五、丢失的工作执行和合并
- 十六、调度器事件
- 十七、导出和导入作业
- 十八、故障排除
- 十九、联系和反馈
- 报告错误
- 获取帮助
一、安装 APScheduler
首选的安装方法是使用 pip:
$ pip install apscheduler
如果您没有安装pip,您可以通过下载并运行 get-pip.py 来轻松安装它。
如果由于某些原因,pip无法工作,您可以从PyPI手动下载APScheduler发行版,解压然后安装它:
$ pip install .
二、代码示例
源代码分发中包含了一个名为 examples
的目录,其中您可以找到 许多使用 APScheduler 的不同方式的工作示例。这些示例也可以在网络上 浏览。
三、基本概念
APScheduler 有四种组件:
- 触发器 triggers
- 任务存储 job stores
- 执行器 executors
- 调度器 schedulers
触发器 包含调度逻辑。每个作业都有自己的触发器,它决定了作业应该何时再次运行。除了它们的初始配置之外,触发器是完全无状态的。
任务存储 存储已计划的作业。默认的任务存储只是将作业保存在内存中,但其他存储将它们保存在各种类型的数据库中。
当作业保存到持久性任务存储中时,其数据被序列化;当从其中加载回来时,数据被反序列化。
任务存储(除了默认的那个之外)不保留作业数据在内存中,而是作为中间人在后端保存、加载、更新和搜索作业。任务存储绝不应该在调度器之间共享。
执行器 是处理作业运行的部分。它们通常通过将作业中的指定可调用对象提交到线程或进程池来完成此操作。
当作业完成时,执行器通知调度器,然后调度器发出适当的事件。
调度器 是将所有这些部分联系在一起的部分。在您的应用程序中,通常只有一个调度器正在运行。
应用程序开发人员通常不直接处理任务存储、执行器或触发器。相反,调度器提供了适当的接口来处理所有这些。
配置任务存储和执行器是通过调度器完成的,就像添加、修改和删除作业一样。
四、选择合适的 scheduler, job store(s), executor(s) and trigger(s)
您选择的调度器主要取决于您的编程环境以及您打算使用 APScheduler 做什么。以下是一个选择调度器的快速指南:
- BlockingScheduler:当调度器是您进程中的唯一运行项时使用
- BackgroundScheduler:当您不使用以下任何框架,并希望调度器在您的应用程序中作为后台运行时使用
- AsyncIOScheduler:当您的应用程序使用 asyncio 模块时使用
- GeventScheduler:当您的应用程序使用 gevent 时使用
- TornadoScheduler:当您正在构建 Tornado 应用程序时使用
- TwistedScheduler:当您正在构建 Twisted 应用程序时使用
- QtScheduler:当您正在构建 Qt 应用程序时使用
简单到这个程度,对吧?
选择合适的作业存储,您需要确定是否需要作业持久化。如果您总是在应用程序开始时重新创建作业,那么您可能可以使用默认的 (MemoryJobStore
)。
但是,如果您需要作业在调度程序重启或应用程序崩溃后仍然保持,那么您的选择通常取决于您的编程环境中使用的工具。
然而,如果您处于可以自由选择的位置,那么由于它的强大数据完整性保护,PostgreSQL 后端的 SQLAlchemyJobStore
是推荐的选择。
同样,如果您使用上述框架之一,执行器的选择通常是由您决定的。
否则,默认的ThreadPoolExecutor
对于大多数用途来说已经足够好了。
如果您的负载涉及CPU密集型操作,您应该考虑使用ProcessPoolExecutor
来利用多个CPU核心。
您甚至可以同时使用两者,将进程池执行器作为次要执行器添加。
当你安排一个作业时,你需要为它选择一个 触发器。触发器决定了计算作业运行日期/时间的逻辑。APScheduler 包含三种内置触发器类型:
- date: 当您想在某特定时间点仅运行一次作业时使用
interval
: 使用于您想要以固定时间间隔运行作业时cron
: 在您想要在一天中的特定时间(们)定期运行作业时使用
它还可能将多个触发器组合成一个,该触发器在所有参与触发器同意的时间触发,或者在任一触发器将触发时触发。有关更多信息,请参阅组合触发器
的文档。
您可以在每个作业存储、执行器和触发类型对应的API文档页面上找到它们的插件名称。
五、配置调度器
APScheduler 提供了多种配置调度器的方式。您可以使用配置字典,或者以关键字参数的形式传入选项。
您还可以首先实例化调度器,然后添加作业并配置调度器。这样,您可以为任何环境获得最大的灵活性。
调度器级别的完整配置选项列表可以在 BaseScheduler
类的 API 参考中找到。BaseScheduler
。
调度器的子类也可能有额外的选项,这些选项在各自的 API 参考中进行了说明。对于单个作业存储和执行器的配置选项也可以在它们的 API 参考页面中找到。
假设您想在应用程序中使用默认的作业存储和默认的执行器来运行 BackgroundScheduler:
from apscheduler.schedulers.background import BackgroundSchedulerscheduler = BackgroundScheduler()# Initialize the rest of the application here, or before the scheduler initialization
这将为您提供一个名为“default”的 BackgroundScheduler,它具有一个名为“default”的MemoryJobStore和一个名为“default”的ThreadPoolExecutor,其默认最大线程数为10。
现在,假设你想要更多。你想要使用 两个 职位存储和 两个 执行器,并且你还想调整新任务的默认值以及设置不同的时区。以下三个示例完全等效,并将为你提供:
- 一个名为“mongo”的MongoDBJobStore
- 一个名为“default”的 SQLAlchemyJobStore(使用 SQLite)
- 一个名为“default”的 ThreadPoolExecutor,具有20个工作线程
- 一个名为“processpool”的 ProcessPoolExecutor,具有5个工作进程
- UTC 作为调度器的时区
- 默认情况下关闭了新工作的合并
- 新作业的默认最大实例限制为3
方法 1:
from pytz import utcfrom apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutorjobstores = {'mongo': MongoDBJobStore(),'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {'default': ThreadPoolExecutor(20),'processpool': ProcessPoolExecutor(5)
}
job_defaults = {'coalesce': False,'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
方法 2:
from apscheduler.schedulers.background import BackgroundScheduler# The "apscheduler." prefix is hard coded
scheduler = BackgroundScheduler({'apscheduler.jobstores.mongo': {'type': 'mongodb'},'apscheduler.jobstores.default': {'type': 'sqlalchemy','url': 'sqlite:///jobs.sqlite'},'apscheduler.executors.default': {'class': 'apscheduler.executors.pool:ThreadPoolExecutor','max_workers': '20'},'apscheduler.executors.processpool': {'type': 'processpool','max_workers': '5'},'apscheduler.job_defaults.coalesce': 'false','apscheduler.job_defaults.max_instances': '3','apscheduler.timezone': 'UTC',
})
方法 3:
from pytz import utcfrom apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutorjobstores = {'mongo': {'type': 'mongodb'},'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {'default': {'type': 'threadpool', 'max_workers': 20},'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {'coalesce': False,'max_instances': 3
}
scheduler = BackgroundScheduler()# .. do something else here, maybe add jobs etc.scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
六、启动调度器
启动调度器只需调用调度器的 start()
方法。对于除了 BlockingScheduler
之外的调度器,这个调用将立即返回,您可以继续应用程序的初始化过程,可能向调度器添加作业。
对于 BlockingScheduler
,您只能在完成所有初始化步骤后调用 start()
方法。
注意
调度器启动后,您将无法更改其设置。
七、添加任务
向调度器添加任务有两种方式:
- 通过调用
add_job()
方法(查看文档) - 通过装饰器
scheduled_job()
装饰一个函数(查看文档)
第一种方式是最常见的添加任务的方法。第二种方式主要是为了方便声明在应用程序运行期间不会改变的任务。add_job()
方法返回一个 apscheduler.job.Job
实例,您可以使用该实例稍后修改或删除任务。
您可以在 任何时间 在调度器上安排任务。如果任务添加时调度器尚未运行,该任务将临时安排,并且其第一次运行时间仅当调度器启动时才会计算。
需要注意的是,如果您使用的是序列化任务的执行器或任务存储,它将对您的任务增加一些要求:
- 目标可调用必须全局可访问
- 传递给可调用函数的任何参数必须是可序列化的
在预定义的任务存储中,只有 MemoryJobStore 不序列化任务。在预定义的执行器中,只有 ProcessPoolExecutor 会序列化任务。
重要:如果在应用程序初始化期间在持久任务存储中安排任务,必须为任务定义一个明确的 ID,并使用 replace_existing=True
,否则每次应用程序重新启动时都会得到任务的新副本!
技巧:要立即运行一个任务,在添加任务时省略 trigger
参数。
八、移除作业
当您从调度器中移除一个作业时,它将从相关的作业存储中移除,并且将不再执行。有两种方式可以实现这一点:
- 通过调用
remove_job()
方法 并传入作业的 ID 和作业存储别名 - 通过调用
remove()
方法 在您从add_job()
方法 获取的作业实例上
后一种方法可能更方便,但它要求您将添加作业时收到的 Job
实例 存储在某个地方。
对于通过 scheduled_job()
方法 安排的作业,第一种方法是唯一的方式。
如果作业的调度结束(即其触发器不再产生任何运行时间),它将自动被移除。
Example:
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()
同样,使用一个显式的作业ID:
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')
九、暂停和恢复作业
您可以通过 Job
实例或调度器本身轻松地暂停和恢复作业。当一个作业被暂停时,其下一次运行时间会被清除,并且直到作业被恢复,不会再为其计算进一步的运行时间。要暂停一个作业,可以使用以下任一方法:
apscheduler.job.Job.pause()
apscheduler.schedulers.base.BaseScheduler.pause_job()
要恢复:
apscheduler.job.Job.resume()
apscheduler.schedulers.base.BaseScheduler.resume_job()
十、获取计划任务列表
要获取一个机器可处理的计划任务列表,您可以使用get_jobs()
方法。它将返回一个包含Job
实例的列表。如果您只对特定作业存储中的作业感兴趣,那么请将作业存储别名作为第二个参数提供。
为了方便起见,您可以使用print_jobs()
方法,该方法将以格式化的列表打印出作业、它们的触发器和下一次运行时间。
十一、修改 jobs
您可以通过调用 apscheduler.job.Job.modify()
或 modify_job()
来修改任何作业属性。您可以修改除 id
以外的任何作业属性。
示例:
job.modify(max_instances=6, name='Alternate name')
如果您想重新安排作业——即更改其触发器,您可以使用 apscheduler.job.Job.reschedule()
或 reschedule_job()
。这些方法为作业构建一个新的触发器,并基于新的触发器重新计算其下一次运行时间。
Example:
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
十二、关闭调度器
要关闭调度器:
scheduler.shutdown()
默认情况下,调度器会关闭其作业存储和执行器,并等待所有当前正在执行的任务完成。如果您不想等待,可以这样做:
scheduler.shutdown(wait=False)
这仍然会关闭作业存储和执行器,但不会等待任何正在运行的任务完成。
十三、暂停/恢复作业处理
可以暂停处理计划中的作业:
scheduler.pause()
这将导致调度器在处理恢复之前不会唤醒:
scheduler.resume()
它也可以在暂停状态下启动调度器,即没有第一个唤醒调用:
scheduler.start(paused=True)
这在你需要在不让它们运行之前修剪不需要的工作时很有用。
十四、限制同一时间 执行的工作实例数量
默认情况下,每个工作只能同时运行一个实例。这意味着,如果工作即将运行,但之前的运行尚未完成,则最新的运行被视为错过。可以通过在添加工作时分发使用 max_instances
关键字参数来设置调度器允许同时运行特定工作的最大实例数。
十五、丢失的工作执行和合并
有时调度器可能无法在预定运行时间执行计划中的工作。最常见的情况是当一个工作被安排在持久化工作存储中,而调度器在作业应该执行之后被关闭并重新启动。
当这种情况发生时,作业被认为是“错过”了。
然后调度器会检查每个错过执行的时间与作业的 misfire_grace_time
选项(可以在每个作业的基础上设置或在调度器中全局设置)进行比较,以查看是否应该触发执行。这可能导致作业连续执行多次。
如果这种行为不适合您的特定用例,您可以使用 coalescing
将所有这些错过执行合并成一次。
换句话说,如果作业启用了合并,并且调度器看到一个或多个该作业的排队执行,它只会触发一次。
对于“跳过”的运行不会发送错过事件。
注意:如果由于池中没有可用线程或进程而导致作业执行延迟,执行器可能会因为执行得太晚(与最初指定的运行时间相比)而跳过它。
如果这种情况可能发生在您的应用程序中,您可能需要增加执行器中的线程/进程数量,或者将 misfire_grace_time
设置调整为更高的值。
十六、调度器事件
可以向调度器附加事件监听器。在特定情况下会触发调度器事件,事件中可能包含有关该特定事件详细信息的附加信息。
可以通过给 add_listener()
函数提供适当的 mask
参数,或使用 OR
操作组合不同的常量来仅监听特定类型的事件。
监听器可调用函数会接收一个参数,即事件对象。
有关可用的事件及其属性的具体信息,请参阅 events
模块的文档。events
模块。
示例:
def my_listener(event):if event.exception:print('The job crashed :(')else:print('The job worked :)')scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
十七、导出和导入作业
如果您需要将作业迁移到不同的作业存储,您需要首先将作业导出到一个JSON文档,然后再次导入它们。
以下是从所有调度器的作业存储中导出作业的示例:
# The scheduler has to be initialized, but can be paused
scheduler.export_jobs("/tmp/jobs.json")
然后您将在目标调度程序中导入作业:
# Again, the scheduler needs to be either running or paused
scheduler.import_jobs("/tmp/jobs.json")
两种方法都接受一个 jobstore
参数,该参数可以限制导出时使用的源作业存储,或者在导入时指定非默认目标作业存储。
这两种方法的第一参数可以是 打开的文件、一个 Path 或作为字符串的文件系统路径。
十八、故障排除
如果调度器没有按预期工作,增加 apscheduler
日志记录器的日志级别到 DEBUG
级别将很有帮助。
如果您最初就没有启用日志记录,可以这样做:
import logginglogging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
这应该提供有关调度器内部正在发生什么的许多有用信息。
也要确保您查看常见问题解答部分,看看您的问题是否已经有了解决方案。
十九、联系和反馈
报告错误
GitHub 提供了一个 错误跟踪器。
获取帮助
如果您遇到问题或有其他疑问,您可以:
- 在 Gitter 上的 apscheduler 房间中提问
- 在 APScheduler GitHub 讨论论坛 中提问,或
- 在 StackOverflow 上提问,并使用
apscheduler
标签标记您的问题
简化基础设施 使用 MongoDB Atlas,领先的开发者数据平台
Ads by EthicalAds
2025-03-19(三)