一、形式一:A任务完成后添加B任务
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime, timedelta# 定义任务A
def task_A():print("任务A开始执行:", datetime.now())# 模拟一些耗时操作# ...print("任务A完成执行:", datetime.now())# 任务A完成后,3秒后触发任务B的执行run_date_plus_one_minute = datetime.now() + timedelta(seconds=3)scheduler.add_job(task_B, 'date', run_date=run_date_plus_one_minute)# 定义任务B
def task_B():print("任务B开始执行:", datetime.now())# 任务B的逻辑# ...print("任务B完成执行:", datetime.now())def task_C():print("任务C开始执行:", datetime.now())# 任务B的逻辑# ...print("任务C完成执行:", datetime.now())if __name__ == '__main__':# 创建后台调度器scheduler = BackgroundScheduler()# 将任务A添加到调度器中scheduler.add_job(task_A, 'interval', seconds=5) # 假设任务A每5秒执行一次# 启动调度器scheduler.start()# 为了防止脚本退出,这里使用一个无限循环try:while True:passexcept (KeyboardInterrupt, SystemExit):# 关闭调度器scheduler.shutdown()
二、形式二:任务编排
import randomfrom apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime, timedelta# 定义任务A
def task_A():print("A:", datetime.now())# 任务A完成后,触发任务B的执行scheduler.add_job(random.choice([t for t in tasks if t is not task_A]), 'date',run_date=datetime.now() + timedelta(seconds=1))# 定义任务B
def task_B():print("B:", datetime.now())scheduler.add_job(random.choice([t for t in tasks if t is not task_B]), 'date',run_date=datetime.now() + timedelta(seconds=0.5))# 定义任务C
def task_C():print("C:", datetime.now())scheduler.add_job(random.choice([t for t in tasks if t is not task_C]), 'date',run_date=datetime.now() + timedelta(seconds=0.5))if __name__ == '__main__':tasks = [task_A, task_B, task_C]# 创建后台调度器scheduler = BackgroundScheduler()# 将任务A添加到调度器中scheduler.add_job(task_A, 'interval', seconds=3) # 假设任务A每10秒执行一次# 启动调度器scheduler.start()# 为了防止脚本退出,这里使用一个无限循环try:while True:passexcept (KeyboardInterrupt, SystemExit):# 关闭调度器scheduler.shutdown()