BaseSchedule类
基础调度类,它定义了一些调度任务的基本属性和方法。以下是该类的主要部分的解释:
__init__(self, nowfun: Callable | None = None, app: Celery | None = None)
:初始化方法,接受两个可选参数,nowfun
表示返回当前时间的函数,app
表示Celery应用程序的实例。now(self) -> datetime
:返回当前时间的方法。它使用了nowfun
属性或者Celery应用程序的now
方法。remaining_estimate(self, last_run_at: datetime) -> timedelta
:估计下一次运行的时间间隔。这是一个抽象方法,需要在子类中实现。is_due(self, last_run_at: datetime) -> tuple[bool, datetime]
:判断任务是否到期的方法。这也是一个抽象方法,需要在子类中实现。maybe_make_aware(self, dt: datetime, naive_as_utc: bool = True) -> datetime
:将时间转换为可能的时区感知时间。它使用了Celery应用程序的时区信息。app
属性:获取或设置与调度相关联的Celery应用程序实例。tz
属性:获取或设置调度任务的时区信息。utc_enabled
属性:获取或设置UTC是否启用的信息。to_local(self, dt: datetime) -> datetime
:将时间转换为本地时间。如果UTC启用,则将时间转换为本地时间;否则,使用回退方式将时间转换为本地时间。__eq__(self, other: Any) -> bool
:判断两个调度任务是否相等的方法。比较的依据是nowfun
属性。
这个类提供了一些基本的调度任务功能,并定义了一些抽象方法,具体的调度任务类需要继承这个基类并实现这些抽象方法。
schedule类
用于表示Celery中的周期性任务调度
-
构造函数 (
__init__
方法):- 通过构造函数接受一些参数,包括
run_every
(任务的运行间隔时间)、relative
(一个布尔值,表示是否将运行时间舍入到间隔的分辨率)、nowfun
(返回当前日期和时间的函数)、app
(Celery应用实例)。 - 使用
super().__init__(nowfun=nowfun, app=app)
调用父类(BaseSchedule
)的构造函数。
- 通过构造函数接受一些参数,包括
-
remaining_estimate
方法:- 计算距离下一次运行任务的剩余时间。
- 使用
remaining
函数,传递上次运行时间、运行间隔、当前时间以及relative
标志。 relative
=True,当前时间是9.36,则会舍入到最近的30分的倍数10:00第一次执行,然后10:30,11:30…relative
=False则不做舍入,当前时间是9.36,则9.36第一次执行,然后10:06,11:36…
-
is_due
方法:- 根据上次运行时间确定任务是否应该运行。
- 返回一个包含布尔值(
is_due
,表示任务是否应该运行)和下一次检查的时间(以秒为单位)的元组。
-
__repr__
方法:- 返回对象的字符串表示,指示任务的频率以人类可读的格式显示。
-
__eq__
方法:- 实现相等性比较,允许对此类的实例进行比较。
-
__reduce__
方法:- 用于序列化。返回一个元组,可用于重新创建对象。
-
seconds
属性:- 返回运行间隔的总秒数。
-
human_seconds
属性:- 返回表示运行间隔秒数的人类可读字符串。
这个类看起来设计得很好,用于定义Celery框架中任务的周期性调度。它包括用于估算剩余时间、检查任务是否应该运行以及提供可读的调度表示的方法。
crontab_parser类
用于解析类似于Crontab表达式的字符串。Crontab表达式通常用于指定计划任务(定时任务)运行的时间规则。下面是对该类的主要解释:
-
构造函数 (
__init__
):- 初始化
crontab_parser
对象。可以通过构造函数指定最大值和最小值,默认为max_=60
和min_=0
。 - 构造函数还设置了一系列正则表达式和相应的处理函数,这些正则表达式用于匹配不同的 Crontab 表达式模式。
- 初始化
-
parse
方法:- 接受一个 Crontab 表达式字符串,返回一个包含时间单位的集合。例如,
parse('*/15')
返回[0, 15, 30, 45]
。
- 接受一个 Crontab 表达式字符串,返回一个包含时间单位的集合。例如,
-
私有方法 (
_parse_part
,_expand_range
,_range_steps
,_star_steps
,_expand_star
,_expand_number
):- 这些方法用于解析和展开不同类型的 Crontab 表达式部分。
_parse_part
方法遍历已解析的 Crontab 表达式部分,使用正则表达式匹配相应的模式,并调用相应的处理函数。_expand_range
方法用于展开范围,例如1-5
展开为[1, 2, 3, 4, 5]
。_range_steps
方法处理带步长的范围,例如1-5/2
展开为[1, 3, 5]
。_star_steps
方法处理带步长的星号,例如*/2
展开为[0, 2, 4, ..., max_]
。_expand_star
方法展开星号,例如*
展开为[min_, min_+1, ..., max_]
。_expand_number
方法用于展开数字,同时检查数字的有效性。
这个类的目的是解析 Crontab 表达式中的不同部分,将其展开为对应的时间单位集合。通过这样的解析,可以获取计划任务运行的时间规则。
crontab类
Celery 中 Crontab 调度的实现,用于定义基于类似于 cron
的时间表的任务执行规则,主要负责解析 Crontab 表达式,计算下一次任务运行的时间,以及判断任务是否应该运行。Crontab 表达式中的星号和数字表示通配符,可以非常灵活地定义任务的运行时间。
class crontab(BaseSchedule):# ...(文档字符串中有详细的描述)def __init__(self, minute: str = '*', hour: str = '*', day_of_week: str = '*',day_of_month: str = '*', month_of_year: str = '*', **kwargs: Any) -> None:# 初始化 Crontab 实例,接受分钟、小时、星期几、每月的第几天和每年的第几月等参数# 参数可以是数字、字符串(Crontab 表达式)或可迭代对象(例如列表、集合)# 使用 crontab_parser 来解析字符串表达式# _expand_cronspec 方法用于展开 cronspec,确保值在合理范围内def _expand_cronspec(cronspec: int | str | Iterable,max_: int, min_: int = 0) -> set[Any]:# 展开 cron 规范,将字符串表达式转换为一个集合,表示 Crontab 触发的所有时间单位值def _delta_to_next(self, last_run_at: datetime, next_hour: int,next_minute: int) -> ffwd:# 找到下一个 delta(时间间隔),用于计算下一次调度的时间# 主要用于在限制任务执行的 day_of_month 和/或 month_of_year cronspec 时def __repr__(self) -> str:# 返回 Crontab 的字符串表示形式def remaining_delta(self, last_run_at: datetime, tz: tzinfo | None = None,ffwd: type = ffwd) -> tuple[datetime, Any, datetime]:# 计算距离下一次执行的时间间隔# 用于 day_of_month 和/或 month_of_year cronspec 的任务调度def remaining_estimate(self, last_run_at: datetime, ffwd: type = ffwd) -> timedelta:# 估算下一次运行时间,返回 timedeltadef is_due(self, last_run_at: datetime) -> tuple[bool, datetime]:# 返回一个元组,表示任务是否应该运行和下一次运行的时间# 考虑了 beat_cron_starting_deadline 配置,确保在 deadline 内执行def __eq__(self, other: Any) -> bool:# 判断两个 Crontab 实例是否相等
maybe_schedule函数
用于将输入参数转换为 Celery 中的调度对象(BaseSchedule
类的实例),输入参数可以是整数(表示秒数)、浮点数(表示秒数)、timedelta
对象(表示时间间隔)或者已有的调度对象。如果输入是整数或浮点数,会先将其转换为 timedelta
对象,然后创建一个调度对象。如果输入已经是 timedelta
对象,则直接创建一个调度对象。如果输入是已有的调度对象,则设置其 Celery app 属性后返回。
def maybe_schedule(s: int | float | timedelta | BaseSchedule, relative: bool = False,app: Celery | None = None) -> float | timedelta | BaseSchedule:"""Return schedule from number, timedelta, or actual schedule."""if s is not None:if isinstance(s, (float, int)):# 如果输入是整数或浮点数,将其转换为 timedelta 对象s = timedelta(seconds=s)if isinstance(s, timedelta):# 如果输入是 timedelta 对象,创建一个 schedule 实例return schedule(s, relative, app=app)else:# 如果输入是 BaseSchedule 的实例,设置其 Celery app 属性s.app = appreturn s
solar类
用于处理太阳事件的 Celery 调度器的类。允许你创建一个周期性任务,该任务将根据特定的太阳事件进行调度。太阳事件的种类包括黎明、日出、日中、日落和黄昏等,你可以选择其中一个事件作为任务的触发条件。在初始化时,你需要指定事件类型(event)、观察者的纬度(lat)和经度(lon),以及其他一些可选参数。类中的方法包括 remaining_estimate
(返回下一次运行的时间估计)和 is_due
(返回任务是否应该运行及下一次运行的时间)。此外,还有一些辅助性的属性和方法,用于表示太阳事件的集合、地平线高度、计算方法等。
class solar(BaseSchedule):"""Solar event.A solar event can be used as the ``run_every`` value of aperiodic task entry to schedule based on certain solar events....Arguments:event (str): Solar event that triggers this task.See note for available values.lat (float): The latitude of the observer.lon (float): The longitude of the observer.nowfun (Callable): Function returning the current date and timeas a class:`~datetime.datetime`.app (Celery): Celery app instance."""_all_events = {...} # 一组表示所有可能太阳事件的字符串集合_horizons = {...} # 一组表示各种太阳事件时,地平线的高度_methods = {...} # 一组表示用于计算太阳事件的方法_use_center_l = {...} # 一组表示是否使用太阳中心计算的布尔值def __init__(self, event: str, lat: int | float, lon: int | float, **kwargs: Any) -> None:# 初始化太阳事件调度器...def remaining_estimate(self, last_run_at: datetime) -> timedelta:"""Return estimate of next time to run.Returns:~datetime.timedelta: when the periodic task shouldrun next, or if it shouldn't run today (e.g., the sun doesnot rise today), returns the time when the next checkshould take place."""# 返回下一次运行的时间估计...def is_due(self, last_run_at: datetime) -> tuple[bool, datetime]:"""Return tuple of ``(is_due, next_time_to_run)``.Note:next time to run is in seconds.See Also::meth:`celery.schedules.schedule.is_due` for more information."""# 返回是否应该运行及下一次运行的时间...def __eq__(self, other: Any) -> bool:if isinstance(other, solar):return (other.event == self.event andother.lat == self.lat andother.lon == self.lon)return NotImplemented