容器中apscheduler不执行_APScheduler:定时任务框架

APScheduler:定时任务框架

安装

文档: https://apscheduler.readthedocs.io/en/stable/userguide.html

安装

$ pip install apscheduler
>>> import apscheduler
>>> apscheduler.version
'3.6.3'

组件

APScheduler由一下四部分组成

  • triggers:触发器,指定定时任务执行的时机,每个任务都有自己的触发器.
  • job stores:存储器,持久存储,默认存储在内存中.
  • executors:执行器,在定时任务执行时,以进程或线程方式执行
  • scheduler:调度器,包含BackgroundScheduler(后台运行)和BlockingScheduler(阻塞运行).他会合理安排作业存储器,执行器,触发器进行工作.并进行添加和删除任务等.调度器通常是只有一个的,开发人员很少直接操作触发器,存储器,执行器等.因为这些都由调度器自动来实现了.
5b1fb0c469726e3d0a8862ce1fd53d3a.png
10334

触发器(triggers)

1.date在特定时间执行

示例:

from datetime import date, datetime

from apscheduler.schedulers.blocking import BlockingScheduler


sched = BlockingScheduler()

def my_job(text):
   print(text)

# run_date 接受 date, datetime 数据类型
sched.add_job(my_job, 'date', run_date=datetime(2020, 11, 27, 19, 33, 30), args=['text'])

sched.start()

更多:https://apscheduler.readthedocs.io/en/stable/modules/triggers/date.html

2.interval间隔执行

在固定的时间间隔后触发事件.参数如下:

weeks周,整形
days一个月中的第几天,整形
hours时,整形
minutes分,整形
seconds秒,整形
start_date起始时间
end_date结束时间
jitter触发的时间误差
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime


sched = BlockingScheduler()

def job_funciton():
   print('hello world')


sched.add_job(job_funciton, trigger='interval',seconds=5)
# 指定小时
# sched.add_job(job_funciton, trigger='interval', hours=2)
# 指定开始,结束时间
# sched.add_job(job_funciton, trigger='interval', start_date='2020-11-27 20:30:00', end_date='2010-11-27 21:30:00)
sched.start() 

3.crontab

在某个确切的时间周期性触发事件.

yearmonth1-12day1-31
week1-53day_of_week一周中的第几天(0/Monday)hour0-23
minute0-59second0-59start_datedatetime数据类型,或者字符串类型
end_date结束时间timezone时区jitter触发的误差时间

也可以用表达式类型,可以用以下方式:

表达式字段描述
*任何在每个值都触发
*/a任何每隔 a触发一次
a-b任何在 a-b区间内任何一个时间触发( a必须小于 b)
a-b/c任何在 a-b区间内每隔 c触发一次
xth yday第 x个星期 y触发
lastxday最后一个星期 x触发
lastday一个月中的最后一天触发
x,y,z任何可以把上面的表达式进行组合
from apscheduler.schedulers.blocking import BlockingScheduler


def job_function():
   print "Hello World"

sched = BlockingScheduler()

# Schedules job_function to be run on the third Friday
# of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')

sched.start()

调度器(schedulers)

  1. BlockingScheduler:适用于调度程序是进程中唯一运行的进程,调用 start函数会阻塞当前线程,不能立即返回。
  2. BackgroundScheduler:适用于调度程序在应用程序的后台运行,调用 start后主线程不会阻塞。
  3. AsyncIOScheduler:适用于使用了 asyncio模块的应用程序。
  4. GeventScheduler:适用于使用 gevent模块的应用程序。
  5. TwistedScheduler:适用于构建 Twisted的应用程序。
  6. QtScheduler:适用于构建 Qt的应用程序。
  7. TornadoScheduler: tornado

任务存储器(job stores)

有2中方式,一种是加载在内存中(默认配置),一种是使用数据库.使用内存简单高效,但是程序出现问题,从新运行,会把以前的任务从新再执行一次.数据库则可以在中断的地方恢复正常使用.

  • MemoryJobStore:使用内存
  • MongoDBJobStore:使用mongodb
  • RedisJobStore:使用redis
  • SQLAlchemy:使用SQLAlchemy框架

1.RedisJobStore

RedisJobStore(db=0, jobs_key='apscheduler.jobs', run_times_key='apscheduler.run_times', pickle_protocol=pickle.HIGHEST_PROTOCOL, **connect_args)

有2种创建的方法:

  • add_jobstore:需要指定redis的相关参数.
from datetime import datetime, timedelta
import sys
import os

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.redis import RedisJobStore



def alarm(time):
    print('Alarm! This alarm was scheduled at %s.' % time)


if __name__ == '__main__':
    scheduler = BlockingScheduler()
    scheduler.add_jobstore('redis', jobs_key='example.jobs', run_times_key='example.run_times', host='192.168.0.101', port=6379, db=0)
    if len(sys.argv) > 1 and sys.argv[1] == '--clear':
        scheduler.remove_all_jobs()

    alarm_time = datetime.now() + timedelta(seconds=300)
    scheduler.add_job(alarm, 'date', run_date=alarm_time, args=[datetime.now()])
    print('To clear the alarms, run this example with the --clear argument.')
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        pass
  • RedisJobStore
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.redis import RedisJobStore
from datetime import datetime, timedelta

jobstore = {
    'default' : RedisJobStore(db=0, jobs_key='myfunc', run_times_key='myfunc_time', host='192.168.0.101', port=6379)
}

def my_func(t):
    print('hello world, %s' %t)


if __name__ == '__main__':
    sched = BlockingScheduler(jobstores=jobstore)
    alarm_time = datetime.now() + timedelta(seconds=300)
    sched.add_job(my_func,run_date=alarm_time, args=['ning'])
    sched.start()

均可在redis中查询到数据.

79962117df8c769278bfb582f072d93d.png

2.SQLAlchemy

使用ORM框架,演示使用MySql

from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime, timedelta

def my_func(t):
   print('hello %s' %t)

if __name__ == "__main__":
   sched = BlockingScheduler()
   url = 'mysql+pymysql://root:2008.Cn123@192.168.0.101:3306/test'
   sched.add_jobstore('sqlalchemy', url=url,tablename='api_job')

   alarm_time = datetime.now() + timedelta(seconds=300)
   sched.add_job(my_func,run_date=alarm_time, args=['ning'])
   sched.start()

如果表不存在,会自动创建表.tablename用于指定表的名称.

在数据库中可以查看到表

select * from api_job;
5a7f5459de518a1986ff343de0909352.png
10336

执行器executors

执行器取决于应用场景,默认是ThreadPoolExecutor,它可以满足大部分需求.如果是CPU密集型计算,可以选择ProcessPoolExecutor

class apscheduler.executors.pool.ThreadPoolExecutor(max_workers=10)class apscheduler.executors.pool.ProcessPoolExecutor(max_workers=10)
# max_worker 指定最多使用线程/进程
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor

executors = {
   'default': ThreadPoolExecutor(20),
}
conf = { # redis配置
   "host":127.0.0.1,
   "port":6379,
   "db":15, # 连接15号数据库
   "max_connections":10 # redis最大支持300个连接数
}
scheduler = BackgroundScheduler(executors=executors)
scheduler.add_jobstore(jobstore='redis', **conf) # 添加任务持久化存储方式,如果未安装redis可省略此步骤

任务操作

  • add_job(func, id='xxx', args=None, kwargs=None)添加任务
# 添加任务func, func参数可以使用 '可导入模块:可调用对象'的方式引入,即可用模块来引入
#❯ tree -L 2
#├── func
#│   ├── add_func.py
#│   ├── __init__.py
#└── jobs.py
# add_func.py
def add(x,y):
    print(x+y)
    
# jobs.py
from apscheduler.schedulers.blocking import BlockingScheduler
from func.add_func import add

shced = BlockingScheduler()


if __name__ == "__main__":
    shced.add_job('func.add_func:add', args=[1,2], id='job1')
    shced.start()   

除去使用add_job(),还可以使用装饰器函数scheduled_job来添加任务.

  • remove_job(job_id):删除任务,需要指定job_id
  • pause_job(job_id):暂停任务
  • resume_job(job_id):恢复任务
  • modify_job(job_id, **changes):修改任务属性
  • print_jobs():作业信息
# 方法1
job = scheduler.add_job(myfunc, 'interval', minutes=2)  # 添加任务
job.remove()  # 删除任务
job.pause() # 暂定任务
job.resume()  # 恢复任务

# 方法2
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')  # 添加任务    
scheduler.remove_job('my_job_id')  # 删除任务
scheduler.pause_job('my_job_id')  # 暂定任务
scheduler.resume_job('my_job_id')  # 恢复任务

示例

方法1

from pytz import utc
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
def tick():
   print('Tick! The time is: %s' % datetime.now())
# 选择MongoDB作为任务存储数据库
jobstores = {
   'mongo': MongoDBJobStore(),
   'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
# 默认使用线程池
executors = {
   'default': ThreadPoolExecutor(20),
   'processpool': ProcessPoolExecutor(5)
}
# 默认参数配置
job_defaults = {
   'coalesce': False,  # 积攒的任务是否只跑一次,是否合并所有错过的Job
   'max_instances': 3,  # 默认同一时刻只能有一个实例运行,通过max_instances=3修改为3个。
   'misfire_grace_time': 30  # 30秒的任务超时容错
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
scheduler.add_job(tick, 'interval', seconds=3)
scheduler.start()

方法2

from apscheduler.schedulers.background import BackgroundScheduler
# The "apscheduler." prefix is hard coded
scheduler = BackgroundScheduler({
   'apscheduler.jobstores.mongo': {
        'type': 'mongodb'
   },
   'apscheduler.jobstores.default': {
       'type': 'sqlalchemy',
       'url': 'sqlite:///jobs.sqlite'
   },
   'apscheduler.executors.default': {
       'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
       'max_workers': '20'
   },
   'apscheduler.executors.processpool': {
       'type': 'processpool',
       'max_workers': '5'
   },
   'apscheduler.job_defaults.coalesce': 'false',
   'apscheduler.job_defaults.max_instances': '3',
   'apscheduler.timezone': 'UTC',
})

方法3

from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
jobstores = {
   'mongo': {'type': 'mongodb'},
   'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
   'default': {'type': 'threadpool', 'max_workers': 20},
   'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
   'coalesce': False,
   'max_instances': 3
}
scheduler = BackgroundScheduler()
# ..这里可以添加任务
scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

misfire_grace_time:如果一个job本来14:00有一次执行,但是由于某种原因没有被调度上,现在14:01了,这个14:00的运行实例被提交时,会检查它预订运行的时间和当下时间的差值(这里是1分钟),大于我们设置的30秒限制,那么这个运行实例不会被执行。合并:最常见的情形是scheduler被shutdown后重启,某个任务会积攒了好几次没执行如5次,下次这个job被submit给executor时,执行5次。将coalesce=True后,只会执行一次

replace_existing: 如果在程序初始化时,是从数据库读取任务的,那么必须为每个任务定义一个明确的ID,并且使用replace_existing=True,否则每次重启程序,你都会得到一份新的任务拷贝,也就意味着任务的状态不会保存。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/533578.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

织梦php网站修改教程,织梦DEDEcms织梦软件模型增加图集功能教程(含修改文件下载)...

这篇文章主要为大家详细介绍了织梦DEDEcms织梦软件模型增加图集功能教程(含修改文件下载),具有一定的参考价值,感兴趣的小伙伴们可以参考一下,有需要的朋友可以收藏方便以后借鉴。织梦DEDEcms织梦软件模型增加图集功能,这是今天361模板要给大家分享的。下…

nikita popov php,PHP中对performance的考虑点

Nikita Popov 在他的演讲中谈了几个PHP 程序中和performance相关的point。1.PHP使用shared memory, preload的方式事先分配,而只有在所有的处理结束之后,share memory 才会断开和所有进程或者thread之间的联系。光是opcode,FPM的设定还不足以…

优化matlab作业,现代设计优化算法MATLAB实现

开篇语前阵子做现代设计方法的时候,发现网上很是缺乏这种作业形式的简易算法实现,所以特地来简书写一篇。有两份,一份是我的(说来惭愧,我的大部分都是在网上找的代码,然后在自己的电脑上跑一次,跑出来了就行…

多因子选选股MATLAB代码,金工研报:利用卷积神经网络进行多因子选股

首先,我们先来看一下通过卷积神经网络选股模型的整体流程,然后再根据每一步流程进行介绍,具体如下图所示:1、数据获取用于历史回测数据来自所有A股股票,其中剔除了ST股以及上市3个月的股票,另外&#xff0c…

matlab bad apple,【bad apple】matlab制作矩阵苹果~

有屏幕的地方就有bad apple那么作为一名工科生,熟练的操♂作马桶萝卜(matlab)是一项基本技能下面开始讲解如何用matlab制作别具一格的“矩阵苹果”~实验环境matlab R2018a原版bad apple视频技术要求可以即时演算图形可以将处理后的每帧图形合并成新的视频先上代码%t…

服务器ip直接访问php怎么写,php - 如何实现用公网ip访问到服务器上的网页?

服务器系统是Windows Server 2012 R2,已经部署了IIS、PHP和MySQL,能够在云服务器上通过localhost打开php网页,(放在服务器wwwroot上的index.php)已在ISS管理器中添加网站,但编辑网站绑定时,在ip地址中填入了服务器的公…

vb6 打印选项对话框_图纸打印次数太多,不知道哪次才是最新的?用打印戳记区分效果好...

原创:就说我在开发区使用AutoCAD从事设计工作的朋友们不知道有没遇到过这种情况:图纸在反复修改打印的过程中,由于图纸内容高度相似,往往搞不清究竟哪个才是最新版本的图纸了。这种情况下,细致入微地去核对非常麻烦&am…

安卓文本编辑器php cpp,用安卓原生控件封装一个简易的富文本编辑器

最近接到项目需求:移动端原生写一个富文本编辑器。 ( ⊙ o ⊙ )从没遇到过富文本要用原生写的,然后就查阅各种资料。然后结合自己的思路:其实安卓的富文本编辑器就是一个 “容器”。那么接下来我就带给大家说一说我自定义这个富文本编…

vue tab切换_iviewUITabs选项卡切换组件

概述 选项卡切换组件,常用于平级区域大块内容的的收纳和展现。源码地址:https://github.com/iview/iview/tree/2.0/src/components/tabs使用: 标签一的内容标签二的内容标签三的内容标签二的内容在源码的tabs文件下有三个文件:ind…

paypal创建订单后怎么获得id_5步创建Facebook商店(最新版教程)学习如何在Facebook上卖货...

请按照以下步骤设置Facebook商店:步骤1:转到您的Facebook页面并配置Shop页面。步骤2:设置您的商店详细信息。步骤3:配置付款。步骤4:将产品添加到您的Facebook商店。步骤5:管理您的订单。步骤6:…

创建索引名称已由现有对象使用_Excel编程周末速成班第3课:Excel对象模型

学习Excel技术,关注微信公众号:excelperfect导语:为了帮助想要快速学会Excel VBA的朋友,特以《Excel Programming Weekend Crash Course》这本书为基础,开始整理一系列资料,在完美Excel社群上分享。一共有3…

socket用起始码分割_常用条码Code128码及EAN13码的介绍

在条码打印软件中,设计条码标签时经常用到的码制是Code128码和EAN13码。因为这两种码制比较贴近我们的生活。比如:我们去超市购物,商品上贴的条码标签都是EAN13码,因为EAN13码多用于零售产品包装。而code128码多用于工厂产线&…

matlab在c盘有缓存文件夹吗,win10如何清除C盘缓存文件-win10清除C盘缓存的方法 - 河东软件园...

在电脑的使用过程中我们会发现磁盘的容量会不断减小,更多时候其他磁盘还有很多剩余空间,C盘的存储空间就已经达到极限了。其实在系统的升级和软件的下载中很多文件就会默认储存在C盘,时间久了以后,C盘不仅有软件文件,还…

java方法体逻辑不会写怎么办,想自己写框架?不会写Java注解可不行

用注解一时爽,一直用一直爽Java后端开发进入spring全家桶时代后,开发一个微服务提供简单的增删改查接口跟玩泥巴似的非常简单,一顿操作猛如虎,回头一看代码加了一堆注解:Controller Autowired Value,面向注…

联想拯救者y7000加内存条_短测联想拯救者Y7000,到底值不值得买?

7000块左右的笔记本电脑有什么好选择,我相信这是很多人一直纠结的问题,虽然我用过的笔记本很多,但是这个价位的用的少之又少,但随着十代酷睿标压处理器的大量铺货,联想拯救者Y7000进入了我的视线。那到底值不值得买呢&…

求逆矩阵计算器_991CN的矩阵运算

昨天的推文中说了一道二端口的题目,虚线中的大二端口由两个小二端口级联而成,在计算过程中,我们需要使用到矩阵的运算,昨天讲题的时候说会说一下用计算器进行矩阵运算,今天我们就来说一下。其实在算电路的时候&#xf…

去快捷方式小箭头_电脑桌面如何快速去掉快捷方式小箭头

我们使用电脑时会发现在电脑桌面图标上经常有一个小箭头,这是什么原因呢?原来,这个小箭头只存在于快捷方式的软件图标上,就是说通过快捷方式创建的图标就有这个小箭头。那么如何去掉这些小箭头呢!不知道也没关系&#…

oracle 移动日志文件,Oracle数据库移动数据文件、日志文件和控制文件

1、关闭数据库SQL> shutdown immediate;2、移动数据文件和日志文件到新位置SQL> host mv /u01/olddata.dbf /u02/newdata.dbf;SQL> host mv /u01/oldredo.log /u02/newredo.log;3、以加载模式启动数据库SQL> startup mount;4、在数据库中更改数据文件和日志文件路径…

oracle查询sql时间ain,Oracle SQL 时间查询

一、在使用Oracle的to_date函数来做日期转换时,很多Java程序员也许会和我一样,直觉的采用ldquo;yyyy-MM-dd HH:mm:ssrdquo;的to_date()与24小时制表示法及mm分钟的显示:一、在使用Oracle的to_date函数来做日期转换时,很多Java程序…

室内定位算法_001:室内定位算法技术咨询服务工作室简介(更新)

点击蓝字关注我们团队成员:何博士(中国科学院大学,博士) 杨博士(加拿大多伦多大学,博士后)微信联系号:UWB_cwhe服务模式:1. 为企业提供定位算法技术支持与指导,以提高企业室内定位系统产品的稳定…