容器中apscheduler不执行_APScheduler:定时任务框架

APScheduler:定时任务框架

安装

文档: https://apscheduler.readthedocs.io/en/stable/userguide.html

安装

$ pip install apscheduler
>>> import apscheduler
>>> apscheduler.version
'3.6.3'

组件

APScheduler由一下四部分组成

  • triggers:触发器,指定定时任务执行的时机,每个任务都有自己的触发器.
  • job stores:存储器,持久存储,默认存储在内存中.
  • executors:执行器,在定时任务执行时,以进程或线程方式执行
  • scheduler:调度器,包含BackgroundScheduler(后台运行)和BlockingScheduler(阻塞运行).他会合理安排作业存储器,执行器,触发器进行工作.并进行添加和删除任务等.调度器通常是只有一个的,开发人员很少直接操作触发器,存储器,执行器等.因为这些都由调度器自动来实现了.
5b1fb0c469726e3d0a8862ce1fd53d3a.png
10334

触发器(triggers)

1.date在特定时间执行

示例:

from datetime import date, datetime

from apscheduler.schedulers.blocking import BlockingScheduler


sched = BlockingScheduler()

def my_job(text):
   print(text)

# run_date 接受 date, datetime 数据类型
sched.add_job(my_job, 'date', run_date=datetime(2020, 11, 27, 19, 33, 30), args=['text'])

sched.start()

更多:https://apscheduler.readthedocs.io/en/stable/modules/triggers/date.html

2.interval间隔执行

在固定的时间间隔后触发事件.参数如下:

weeks周,整形
days一个月中的第几天,整形
hours时,整形
minutes分,整形
seconds秒,整形
start_date起始时间
end_date结束时间
jitter触发的时间误差
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime


sched = BlockingScheduler()

def job_funciton():
   print('hello world')


sched.add_job(job_funciton, trigger='interval',seconds=5)
# 指定小时
# sched.add_job(job_funciton, trigger='interval', hours=2)
# 指定开始,结束时间
# sched.add_job(job_funciton, trigger='interval', start_date='2020-11-27 20:30:00', end_date='2010-11-27 21:30:00)
sched.start() 

3.crontab

在某个确切的时间周期性触发事件.

yearmonth1-12day1-31
week1-53day_of_week一周中的第几天(0/Monday)hour0-23
minute0-59second0-59start_datedatetime数据类型,或者字符串类型
end_date结束时间timezone时区jitter触发的误差时间

也可以用表达式类型,可以用以下方式:

表达式字段描述
*任何在每个值都触发
*/a任何每隔 a触发一次
a-b任何在 a-b区间内任何一个时间触发( a必须小于 b)
a-b/c任何在 a-b区间内每隔 c触发一次
xth yday第 x个星期 y触发
lastxday最后一个星期 x触发
lastday一个月中的最后一天触发
x,y,z任何可以把上面的表达式进行组合
from apscheduler.schedulers.blocking import BlockingScheduler


def job_function():
   print "Hello World"

sched = BlockingScheduler()

# Schedules job_function to be run on the third Friday
# of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')

sched.start()

调度器(schedulers)

  1. BlockingScheduler:适用于调度程序是进程中唯一运行的进程,调用 start函数会阻塞当前线程,不能立即返回。
  2. BackgroundScheduler:适用于调度程序在应用程序的后台运行,调用 start后主线程不会阻塞。
  3. AsyncIOScheduler:适用于使用了 asyncio模块的应用程序。
  4. GeventScheduler:适用于使用 gevent模块的应用程序。
  5. TwistedScheduler:适用于构建 Twisted的应用程序。
  6. QtScheduler:适用于构建 Qt的应用程序。
  7. TornadoScheduler: tornado

任务存储器(job stores)

有2中方式,一种是加载在内存中(默认配置),一种是使用数据库.使用内存简单高效,但是程序出现问题,从新运行,会把以前的任务从新再执行一次.数据库则可以在中断的地方恢复正常使用.

  • MemoryJobStore:使用内存
  • MongoDBJobStore:使用mongodb
  • RedisJobStore:使用redis
  • SQLAlchemy:使用SQLAlchemy框架

1.RedisJobStore

RedisJobStore(db=0, jobs_key='apscheduler.jobs', run_times_key='apscheduler.run_times', pickle_protocol=pickle.HIGHEST_PROTOCOL, **connect_args)

有2种创建的方法:

  • add_jobstore:需要指定redis的相关参数.
from datetime import datetime, timedelta
import sys
import os

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.redis import RedisJobStore



def alarm(time):
    print('Alarm! This alarm was scheduled at %s.' % time)


if __name__ == '__main__':
    scheduler = BlockingScheduler()
    scheduler.add_jobstore('redis', jobs_key='example.jobs', run_times_key='example.run_times', host='192.168.0.101', port=6379, db=0)
    if len(sys.argv) > 1 and sys.argv[1] == '--clear':
        scheduler.remove_all_jobs()

    alarm_time = datetime.now() + timedelta(seconds=300)
    scheduler.add_job(alarm, 'date', run_date=alarm_time, args=[datetime.now()])
    print('To clear the alarms, run this example with the --clear argument.')
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        pass
  • RedisJobStore
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.redis import RedisJobStore
from datetime import datetime, timedelta

jobstore = {
    'default' : RedisJobStore(db=0, jobs_key='myfunc', run_times_key='myfunc_time', host='192.168.0.101', port=6379)
}

def my_func(t):
    print('hello world, %s' %t)


if __name__ == '__main__':
    sched = BlockingScheduler(jobstores=jobstore)
    alarm_time = datetime.now() + timedelta(seconds=300)
    sched.add_job(my_func,run_date=alarm_time, args=['ning'])
    sched.start()

均可在redis中查询到数据.

79962117df8c769278bfb582f072d93d.png

2.SQLAlchemy

使用ORM框架,演示使用MySql

from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime, timedelta

def my_func(t):
   print('hello %s' %t)

if __name__ == "__main__":
   sched = BlockingScheduler()
   url = 'mysql+pymysql://root:2008.Cn123@192.168.0.101:3306/test'
   sched.add_jobstore('sqlalchemy', url=url,tablename='api_job')

   alarm_time = datetime.now() + timedelta(seconds=300)
   sched.add_job(my_func,run_date=alarm_time, args=['ning'])
   sched.start()

如果表不存在,会自动创建表.tablename用于指定表的名称.

在数据库中可以查看到表

select * from api_job;
5a7f5459de518a1986ff343de0909352.png
10336

执行器executors

执行器取决于应用场景,默认是ThreadPoolExecutor,它可以满足大部分需求.如果是CPU密集型计算,可以选择ProcessPoolExecutor

class apscheduler.executors.pool.ThreadPoolExecutor(max_workers=10)class apscheduler.executors.pool.ProcessPoolExecutor(max_workers=10)
# max_worker 指定最多使用线程/进程
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor

executors = {
   'default': ThreadPoolExecutor(20),
}
conf = { # redis配置
   "host":127.0.0.1,
   "port":6379,
   "db":15, # 连接15号数据库
   "max_connections":10 # redis最大支持300个连接数
}
scheduler = BackgroundScheduler(executors=executors)
scheduler.add_jobstore(jobstore='redis', **conf) # 添加任务持久化存储方式,如果未安装redis可省略此步骤

任务操作

  • add_job(func, id='xxx', args=None, kwargs=None)添加任务
# 添加任务func, func参数可以使用 '可导入模块:可调用对象'的方式引入,即可用模块来引入
#❯ tree -L 2
#├── func
#│   ├── add_func.py
#│   ├── __init__.py
#└── jobs.py
# add_func.py
def add(x,y):
    print(x+y)
    
# jobs.py
from apscheduler.schedulers.blocking import BlockingScheduler
from func.add_func import add

shced = BlockingScheduler()


if __name__ == "__main__":
    shced.add_job('func.add_func:add', args=[1,2], id='job1')
    shced.start()   

除去使用add_job(),还可以使用装饰器函数scheduled_job来添加任务.

  • remove_job(job_id):删除任务,需要指定job_id
  • pause_job(job_id):暂停任务
  • resume_job(job_id):恢复任务
  • modify_job(job_id, **changes):修改任务属性
  • print_jobs():作业信息
# 方法1
job = scheduler.add_job(myfunc, 'interval', minutes=2)  # 添加任务
job.remove()  # 删除任务
job.pause() # 暂定任务
job.resume()  # 恢复任务

# 方法2
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')  # 添加任务    
scheduler.remove_job('my_job_id')  # 删除任务
scheduler.pause_job('my_job_id')  # 暂定任务
scheduler.resume_job('my_job_id')  # 恢复任务

示例

方法1

from pytz import utc
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
def tick():
   print('Tick! The time is: %s' % datetime.now())
# 选择MongoDB作为任务存储数据库
jobstores = {
   'mongo': MongoDBJobStore(),
   'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
# 默认使用线程池
executors = {
   'default': ThreadPoolExecutor(20),
   'processpool': ProcessPoolExecutor(5)
}
# 默认参数配置
job_defaults = {
   'coalesce': False,  # 积攒的任务是否只跑一次,是否合并所有错过的Job
   'max_instances': 3,  # 默认同一时刻只能有一个实例运行,通过max_instances=3修改为3个。
   'misfire_grace_time': 30  # 30秒的任务超时容错
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
scheduler.add_job(tick, 'interval', seconds=3)
scheduler.start()

方法2

from apscheduler.schedulers.background import BackgroundScheduler
# The "apscheduler." prefix is hard coded
scheduler = BackgroundScheduler({
   'apscheduler.jobstores.mongo': {
        'type': 'mongodb'
   },
   'apscheduler.jobstores.default': {
       'type': 'sqlalchemy',
       'url': 'sqlite:///jobs.sqlite'
   },
   'apscheduler.executors.default': {
       'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
       'max_workers': '20'
   },
   'apscheduler.executors.processpool': {
       'type': 'processpool',
       'max_workers': '5'
   },
   'apscheduler.job_defaults.coalesce': 'false',
   'apscheduler.job_defaults.max_instances': '3',
   'apscheduler.timezone': 'UTC',
})

方法3

from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
jobstores = {
   'mongo': {'type': 'mongodb'},
   'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
   'default': {'type': 'threadpool', 'max_workers': 20},
   'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
   'coalesce': False,
   'max_instances': 3
}
scheduler = BackgroundScheduler()
# ..这里可以添加任务
scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

misfire_grace_time:如果一个job本来14:00有一次执行,但是由于某种原因没有被调度上,现在14:01了,这个14:00的运行实例被提交时,会检查它预订运行的时间和当下时间的差值(这里是1分钟),大于我们设置的30秒限制,那么这个运行实例不会被执行。合并:最常见的情形是scheduler被shutdown后重启,某个任务会积攒了好几次没执行如5次,下次这个job被submit给executor时,执行5次。将coalesce=True后,只会执行一次

replace_existing: 如果在程序初始化时,是从数据库读取任务的,那么必须为每个任务定义一个明确的ID,并且使用replace_existing=True,否则每次重启程序,你都会得到一份新的任务拷贝,也就意味着任务的状态不会保存。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/533578.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

nginx php image,[Docker]应该把 nginx 和 PHP 放在一个 image 里还是分开?

因为老板想搞 K8S,但是我连 Docker 都不懂,就觉得还是要学一点点 Docker 的,之前还是看了一点点的,甚至折腾过一个开发环境的方案,但是,很长时间不弄了以后,就全都还回去了。这次我又想自己搭建…

python pep8模块_读懂PEP8,让你的Python代码更加优雅

PEP8《8 号 Python 增强规范》(Python Enhacement Proposal #8),简称PEP8通俗的来讲 PEP8 是针对 python 代码格式而编订的风格指南,令代码更加易读易懂。像谷歌这样的大公司是有自己内部的风格规范Google Style,目的就是为了提高开发效率。据…

python数值模拟教程_数值模拟必备random模块

该模块实现了各种分布的伪随机数生成器。可以在区间内抽取一个随机数,可以在列表中抽取一个元素,可以从分布中抽取样本 。random模块不能直接访问,需要导入 random 模块,然后通过 random 静态对象调用该方法。import random1 生成…

php版本哪个没有面向对象,php面向对象的方法重载两种版本比较

多个函数用同一个名字,但参数表,即参数的个数或(和)数据类型可以不同,调用的时候,虽然方法名字相同,但根据参数表可以自动调用对应的函数。PHP4 中仅仅实现了面向对象的部分的、简单的功能,而 PHP5 以后对对…

python实现录音小程序 界面_小程序如何实现录音 播放功能

第二步:编辑文件首先在src下创建一个test包并在test包下新建一个类MyRecord具体步骤代码如下所示:package test;import java.awt.*;import javax.swing.*;import java.awt.event.*;import java.io.*;import javax.sound.sampled.*;public class MyRecord…

织梦php网站修改教程,织梦DEDEcms织梦软件模型增加图集功能教程(含修改文件下载)...

这篇文章主要为大家详细介绍了织梦DEDEcms织梦软件模型增加图集功能教程(含修改文件下载),具有一定的参考价值,感兴趣的小伙伴们可以参考一下,有需要的朋友可以收藏方便以后借鉴。织梦DEDEcms织梦软件模型增加图集功能,这是今天361模板要给大家分享的。下…

python自动截图发送邮件_PhantomJS按尺寸截取页面,并用python发送邮件

前言:当前有个任务是要把几个网站的日志返回状态码进行汇总,用饼图展示,并每天发送邮件。一、分析问题画出饼图,这个我用kibana给画出来了,下面不做讲解;截取饼图,因为kibana是用js展示出来的&a…

nikita popov php,PHP中对performance的考虑点

Nikita Popov 在他的演讲中谈了几个PHP 程序中和performance相关的point。1.PHP使用shared memory, preload的方式事先分配,而只有在所有的处理结束之后,share memory 才会断开和所有进程或者thread之间的联系。光是opcode,FPM的设定还不足以…

python建模仿真 matlab_清华大学出版社-图书详情-《仿真建模与MATLAB实用教程》

MATLAB语言是目前世界上最为流行的科学计算语言之一,它的特点是能够快速地完成诸如矩阵运算、微分、寻优等计算任务。由于它配备了很多应用领域的专业工具箱,诸如金融、信号处理、图像处理、神经网络、嵌入式系统、仿真建模等,而且每个工具箱都包含了该应…

java web使用jquery,JAVA_Web_JQuery

简介:jquery 全称 javaScript Query.是js的一个框架。本质上仍然是js。特点:支持各种主流的浏览器、使用特别简单、拥有便捷的插件扩展机制和丰富的插件。一、JQuery内部封装原理介绍:匿名闭包。下面这两行代码是jquery包下的已经封装的代码&…

python语法学习_Python学习1——语法

Python语法包括了行、缩进、注释、标识符、保留关键字等方面。打印语句:>>> print(hello,world!)hello,world!输入语句:>>> input(请输入你的名字:)请输入你的名字:哈哈#”哈哈”是你自己输入的名字哈哈 #打印出…

java 两个页面传递数据,请问Cookie怎么在两个页面间传递数据?

参考代码如下://如果请求的Cookie对象为空if (Request.Cookies["userCookie"] null){//创建一个Cookie对象HttpCookie userCookie new HttpCookie("userCookie");//给对象赋值userCookie.Values["userName"] userInfo.UserName.ToS…

优化matlab作业,现代设计优化算法MATLAB实现

开篇语前阵子做现代设计方法的时候,发现网上很是缺乏这种作业形式的简易算法实现,所以特地来简书写一篇。有两份,一份是我的(说来惭愧,我的大部分都是在网上找的代码,然后在自己的电脑上跑一次,跑出来了就行…

怎样用python画玫瑰花的简笔画_玫瑰花简笔画素描作品图片

玫瑰原产是中国。在古时的汉语,“玫瑰”一词原意是指红色美玉。玫瑰花这么漂亮,素描怎么画得好看呢?你知道玫瑰花的简笔画素描是怎样的吗?今天先和学习啦小编一起欣赏这些玫瑰花简笔画素描图片,希望你会有所收获的。玫瑰花简笔画素描图片欣…

多因子选选股MATLAB代码,金工研报:利用卷积神经网络进行多因子选股

首先,我们先来看一下通过卷积神经网络选股模型的整体流程,然后再根据每一步流程进行介绍,具体如下图所示:1、数据获取用于历史回测数据来自所有A股股票,其中剔除了ST股以及上市3个月的股票,另外&#xff0c…

python list tuple 打包 解包_python的打包与解包

python的*与**,在函数的定义与调用过程中,有着不同的作用打包参数:一、函数定义时,形参前加*号(如:*args):收集实参中所有的位置参数,打包成新元组并将该元组赋值给args变量实参位置参数&#x…

python 成员函数 泛型函数_【一点资讯】白学这么多年 Python?连泛型函数都不会写? www.yidianzixun.com...

泛型,如果你尝过java,应该对他不陌生吧。但你可能不知道在 Python 中(3.4 ),也可以实现 简单的泛型函数。在Python中只能实现基于单个(第一个)参数的数据类型来选择具体的实现方式,官方名称 是single-dispatch。你或许听不懂&…

matlab bad apple,【bad apple】matlab制作矩阵苹果~

有屏幕的地方就有bad apple那么作为一名工科生,熟练的操♂作马桶萝卜(matlab)是一项基本技能下面开始讲解如何用matlab制作别具一格的“矩阵苹果”~实验环境matlab R2018a原版bad apple视频技术要求可以即时演算图形可以将处理后的每帧图形合并成新的视频先上代码%t…

服务器ip直接访问php怎么写,php - 如何实现用公网ip访问到服务器上的网页?

服务器系统是Windows Server 2012 R2,已经部署了IIS、PHP和MySQL,能够在云服务器上通过localhost打开php网页,(放在服务器wwwroot上的index.php)已在ISS管理器中添加网站,但编辑网站绑定时,在ip地址中填入了服务器的公…

vb6 打印选项对话框_图纸打印次数太多,不知道哪次才是最新的?用打印戳记区分效果好...

原创:就说我在开发区使用AutoCAD从事设计工作的朋友们不知道有没遇到过这种情况:图纸在反复修改打印的过程中,由于图纸内容高度相似,往往搞不清究竟哪个才是最新版本的图纸了。这种情况下,细致入微地去核对非常麻烦&am…