Python APSchedule安装使用与源码分析

我们的项目中用apschedule作为核心定时调度模块。所以对apschedule进行了一些调查和源码级的分析。

 

1、为什么选择apschedule?

听信了一句话,apschedule之于python就像是quartz之于java。实际用起来还是不错的。

 

2、安装

# pip安装方式
$ pip install apscheduler
# 源码编译方式
$ wget https://pypi.python.org/pypi/APScheduler/#downloads
$ python setup.py install

 

3、apschedule有四个主要的组件

1)trigger - 触发器

2)job stores - 任务存储(内存memory和持久化persistence)

3)executor - 执行器(实现是基于concurrent.futures的线程池或者进程池)

4)schedulers - 调度器(控制着其他的组件,最常用的是background方式和blocking方式)

先上一个例子

复制代码
# -*- coding:utf-8 -*-
import redis
from datetime import datetime, timedelta
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.events import EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_MISSED
class ScheduleFactory(object):def __init__(self):if not hasattr(ScheduleFactory, '__scheduler'):__scheduler = ScheduleFactory.get_instance()self.scheduler = __scheduler@staticmethoddef get_instance():pool = redis.ConnectionPool(host='10.94.99.56',port=6379,)r = redis.StrictRedis(connection_pool=pool)jobstores = {'redis': RedisJobStore(2, r),'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}executors = {'default': ThreadPoolExecutor(max_workers=30),'processpool': ProcessPoolExecutor(max_workers=30)}job_defaults = {'coalesce': False,'max_instances': 3}scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, daemonic=False)
return scheduler
复制代码

说明:上例中,scheduleFactory被实现为一个单例模式,保证new出的对象全局唯一

 

4、对scheduler的选择

这里只给出两个场景:

1)BackgroundScheduler:这种方式在创建scheduler的父进程退出后,任务同时停止调度。适用范围:集成在服务中,例如django。

2)BlockingScheduler:这种方式会阻塞住创建shceduler的进程,适用范围:该程序只干调度这一件事情。

选择完调度器之后

1)scheduler.start() 启动调度器

2)scheduler.shutdown() 停止调度器,调用该方法,调度器等到所有执行中的任务执行完成再退出,可以使用wait=False禁用

程序变为如下样子

复制代码
class ScheduleFactory(object):def __init__(self):if not hasattr(ScheduleFactory, '__scheduler'):__scheduler = ScheduleFactory.get_instance()self.scheduler = __scheduler@staticmethoddef get_instance():pool = redis.ConnectionPool(host='10.94.99.56',port=6379,)r = redis.StrictRedis(connection_pool=pool)jobstores = {'redis': RedisJobStore(2, r),'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}executors = {'default': ThreadPoolExecutor(max_workers=30),'processpool': ProcessPoolExecutor(max_workers=30)}job_defaults = {'coalesce': False,'max_instances': 3}scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, daemonic=False)# scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, daemonic=False)return schedulerdef start(self):self.scheduler.start()def shutdown(self):self.scheduler.shutdown()
复制代码

 

5、对jobstores的选择

大的方向有两个:

1)非持久化

可选的stores:MemoryJobStrore

适用于你不会频繁启动和关闭调度器,而且对定时任务丢失批次不敏感。

2)持久化

可选的stores:SQLAlchemyJobStore, RedisJobStore,MongoDBJobStore,ZooKeeperJobStore

适用于你对定时任务丢失批次敏感的情况

jobStores初始化配置的方式是使用一个字典,例如

jobstores = {'redis': RedisJobStore(2, r),'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}

key是你配置store的名字,后面在添加任务的使用,可以指定对应的任务使用对应的store,例如这里选用的都是key=default的store。

def add_job(self, job_func, interval, id, job_func_params=None)self.scheduler.add_job(job_func, jobstore='default', trigger='interval', seconds=interval, id=id, kwargs=job_func_params, executor='default', next_run_time=next_run_time, misfire_grace_time=30)

 

6、executor的选择

只说两个,线程池和进程池。默认default是线程池方式。这个数是执行任务的实际并发数,如果你设置的小了而job添加的比较多,可能出现丢失调度的情况。

同时对于python多线程场景,如果是计算密集型任务,实际的并发度达不到配置的数量。所以这个数字要根据具体的要求设置。

一般来说我们设置并发为30,对一般的场景是没有问题的。

executors = {'default': ThreadPoolExecutor(max_workers=30),'processpool': ProcessPoolExecutor(max_workers=30)}

同样在add_job的时候,我们可以选择对应的执行器

def add_job(self, job_func, interval, id, job_func_params=None)self.scheduler.add_job(job_func, jobstore='default', trigger='interval', seconds=interval, id=id, kwargs=job_func_params, executor='default', next_run_time=next_run_time, misfire_grace_time=30)

 

7、trigger的选择

这是最简单的一个了,有三种,不用配置

1、date - 每天的固定时间

2、interval - 间隔多长时间执行

3、cron - 正则

 

8、job的增删改查接口api可以参看手册

http://apscheduler.readthedocs.io/en/latest/userguide.html#choosing-the-right-scheduler-job-store-s-executor-s-and-trigger-s

 

9、问题fix

1)2017-07-24 14:06:28,480 [apscheduler.executors.default:120] [WARNING]- Run time of job "etl_func (trigger: interval[0:01:00], next run at: 2017-07-24 14:07:27 CST)" was missed by 0:00:01.245424

这个问题对应的源码片段是

复制代码
def run_job(job, jobstore_alias, run_times, logger_name):"""Called by executors to run the job. Returns a list of scheduler events to be dispatched by thescheduler."""events = []logger = logging.getLogger(logger_name)for run_time in run_times:# See if the job missed its run time window, and handle# possible misfires accordinglyif job.misfire_grace_time is not None:difference = datetime.now(utc) - run_timegrace_time = timedelta(seconds=job.misfire_grace_time)if difference > grace_time:events.append(JobExecutionEvent(EVENT_JOB_MISSED, job.id, jobstore_alias,run_time))logger.warning('Run time of job "%s" was missed by %s', job, difference)continuelogger.info('Running job "%s" (scheduled at %s)', job, run_time)try:retval = job.func(*job.args, **job.kwargs)except:exc, tb = sys.exc_info()[1:]formatted_tb = ''.join(format_tb(tb))events.append(JobExecutionEvent(EVENT_JOB_ERROR, job.id, jobstore_alias, run_time,exception=exc, traceback=formatted_tb))logger.exception('Job "%s" raised an exception', job)else:events.append(JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, jobstore_alias, run_time,retval=retval))logger.info('Job "%s" executed successfully', job)return events
复制代码

这里面有个参数是misfire_grace_time,默认是1s,如果任务的实际执行时间与任务调度时间的时间差>misfire_grace_time,就会warning并且跳过这次任务的调度!!!

为什么会发生这个问题?

1)executor并发度不够,你添加的任务太多

2) misfire_grace_time,还是太小了

 

2)如果你使用的trigger=interval,并且设置了misfire_grace_time=30这种的话,如果你首次启动的时间是10:50那么调度间隔和实际执行可能有1分钟的误差

怎么解决这个问题呢,你可以通过next_run_time设置首次调度的时间,让这个时间取整分钟。例如

def add_job(self, job_func, interval, id, job_func_params=None):next_minute = (datetime.now() + timedelta(minutes=1)).strftime("%Y-%m-%d %H:%M")next_run_time = datetime.strptime(next_minute, "%Y-%m-%d %H:%M")self.scheduler.add_job(job_func, jobstore='default', trigger='interval', seconds=interval, id=id, kwargs=job_func_params, executor='default', next_run_time=next_run_time, misfire_grace_time=30)

 

3)2017-07-25 11:02:00,003 [apscheduler.scheduler:962] [WARNING]- Execution of job "rule_func (trigger: interval[0:01:00], next run at: 2017-07-25 11:02:00 CST)" skipped: maximum number of running instances reached (1)

对应的源码为

复制代码
         for job in due_jobs:# Look up the job's executortry:executor = self._lookup_executor(job.executor)except:self._logger.error('Executor lookup ("%s") failed for job "%s" -- removing it from the ''job store', job.executor, job)self.remove_job(job.id, jobstore_alias)continuerun_times = job._get_run_times(now)run_times = run_times[-1:] if run_times and job.coalesce else run_timesif run_times:try:executor.submit_job(job, run_times)except MaxInstancesReachedError:self._logger.warning('Execution of job "%s" skipped: maximum number of running ''instances reached (%d)', job, job.max_instances)event = JobSubmissionEvent(EVENT_JOB_MAX_INSTANCES, job.id,jobstore_alias, run_times)events.append(event)
复制代码

submit_job的源码

    with self._lock:if self._instances[job.id] >= job.max_instances:raise MaxInstancesReachedError(job)self._do_submit_job(job, run_times)self._instances[job.id] += 1
 

这是什么意思呢,当对一个job的一次调度的任务数>max_instances,会触发这个异常,并终止调度。例如对一个批次的调度,比如job1,在10:00这次的调度,执行的时候发现有两个任务被添加了。这怎么会发生呢?会。可能09:59分的调度没有成功执行,但是持久化了下来,那么在10:00会尝试再次执行。

max_instances默认是1,如果想让这种异常放过的话,你可以设置max_instances大一些,比如max_instances=3

 

10、如果你想监控你的调度,那么apschedule提供了listener机制,可以监听一些异常。只需要注册监听者就好

复制代码
  def add_err_listener(self):self.scheduler.add_listener(err_listener, EVENT_JOB_MAX_INSTANCES|EVENT_JOB_MISSED|EVENT_JOB_ERROR)def err_listener(ev):msg = ''if ev.code == EVENT_JOB_ERROR:msg = ev.tracebackelif ev.code == EVENT_JOB_MISSED:msg = 'missed job, job_id:%s, schedule_run_time:%s' % (ev.job_id, ev.scheduled_run_time)elif ev.code == EVENT_JOB_MAX_INSTANCES:msg = 'reached maximum of running instances, job_id:%s' %(ev.job_id)rs = RobotSender()rs.send("https://oapi.dingtalk.com/robot/send?access_token=499ca69a2b45402c00503acea611a6ae6a2f1bacb0ca4d33365595d768bb2a58",u"[apscheduler调度异常] 异常信息:%s" % (msg),'15210885002',False)
复制代码

 

最后的代码

复制代码
# -*- coding:utf-8 -*-
import redis
from datetime import datetime, timedelta
from apscheduler.schedulers.background import BackgroundScheduler, BlockingScheduler
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.events import EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_MISSED
from alarmkits.send_robot import RobotSenderclass ScheduleFactory(object):def __init__(self):if not hasattr(ScheduleFactory, '__scheduler'):__scheduler = ScheduleFactory.get_instance()self.scheduler = __scheduler@staticmethoddef get_instance():pool = redis.ConnectionPool(host='10.94.99.56',port=6379,)r = redis.StrictRedis(connection_pool=pool)jobstores = {'redis': RedisJobStore(2, r),'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}executors = {'default': ThreadPoolExecutor(max_workers=30),'processpool': ProcessPoolExecutor(max_workers=30)}job_defaults = {'coalesce': False,'max_instances': 3}scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, daemonic=False)# scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, daemonic=False)return schedulerdef start(self):self.scheduler.start()def shutdown(self):self.scheduler.shutdown()def add_job(self, job_func, interval, id, job_func_params=None):next_minute = (datetime.now() + timedelta(minutes=1)).strftime("%Y-%m-%d %H:%M")next_run_time = datetime.strptime(next_minute, "%Y-%m-%d %H:%M")self.scheduler.add_job(job_func,jobstore='default',trigger='interval',seconds=interval,id=id,kwargs=job_func_params,executor='default',next_run_time=next_run_time,misfire_grace_time=30,max_instances=3)def remove_job(self, id):self.scheduler.remove_job(id)def modify_job(self, id, interval):self.scheduler.modify_job(job_id=id, seconds=interval)def add_err_listener(self):self.scheduler.add_listener(err_listener, EVENT_JOB_MAX_INSTANCES|EVENT_JOB_MISSED|EVENT_JOB_ERROR)def err_listener(ev):msg = ''if ev.code == EVENT_JOB_ERROR:msg = ev.tracebackelif ev.code == EVENT_JOB_MISSED:msg = 'missed job, job_id:%s, schedule_run_time:%s' % (ev.job_id, ev.scheduled_run_time)elif ev.code == EVENT_JOB_MAX_INSTANCES:msg = 'reached maximum of running instances, job_id:%s' %(ev.job_id)rs = RobotSender()rs.send("https://oapi.dingtalk.com/robot/send?access_token=499ca69a2b45402c00503acea611a6ae6a2f1bacb0ca4d33365595d768bb2a58",u"[apscheduler调度异常] 异常信息:%s" % (msg),'15210885002',False)
复制代码

 

 

 

        

转载于:https://www.cnblogs.com/zhuminghui/p/9145319.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/366867.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

NetBeans 7.4的本机Java打包

成为“ NetBeans 74 NewAndNoteworthy ”页面的NetBeans 7.4的新功能之一是“本机打包 ”,在该页面上被描述为“ JavaSE项目现在支持使用JavaFX提供的本机打包技术来创建本机包。 ” 我将使用一个非常简单的示例来演示NetBeans 7.4中的本机打包功能。 下一个代码清…

基于Vue开发一个日历组件

最近在做一个类似课程表的需求,需要自制一个日历来支持功能及展现,就顺便研究一下应该怎么开发日历组件。 更新 2.23修复了2026年2月份会渲染多一行的bug,谢谢深蓝一人童鞋提出的bug,解决方案是给二月份的日历做特殊处理&#xf…

php 打开word乱码怎么办,如何解决php word 乱码问题

php word乱码的解决办法:首先打开“/Writer/Word2007/Base.php”文件;然后添加“$objWriter->writeAttribute(‘w:eastAsia’, $font)”内容;最后保存修改即可。PHPword解决中文乱码一、增加东亚字体支持打开并编辑路径/Writer/Word2007/B…

Java开发人员访谈的MindMap

多年来,我曾在许多Java开发人员访谈中担任小组成员。 之前,我曾写过一篇标题为“成功进行软件工程师技术面试的7大技巧”的文章,其中涵盖了很少的一般准则。 在本文中,我将分享一个思维导图,其中包含Java开发人员访谈中…

送给大家一个好看的简历神器

很多人看到里边有好看的东西就习惯性的点进来看看,还一边点一边想 —— 好看的简历我见多了,你这个又能好看到哪里去。我想差不多可以: 哪里吧因为最近有在准备简历,就习惯性的找一找有没有现成的简历模板。结果全是付费的&#x…

PHP简单实现单点登录功能示例

1.准备两个虚拟域名 127.0.0.1 www.openpoor.com127.0.0.1 www.myspace.com 2.在openpoor的根目录下创建以下文件 index.PHP 123456789101112131415161718<?phpsession_start();?><!DOCTYPE html><html><head><meta charset"UTF-8"/&…

JUNG 计算图属性,中心度,偏心率,直径,半径

本文介绍利用Java的第三方API JUNG 计算图中&#xff1a; closeness centrality&#xff1b;// 图中某节点的 接近中心性/亲密中心性 betweenness centrality&#xff1b;// 图中某节点的 中介中心性/介数中心性 distance; // 图中两节点的最短距离 eccentricity; // 图中某节…

Java VM –提防YoungGen空间

您可能从我们以前的面向性能的文章中看到&#xff0c;健康的JVM是实现最佳应用程序性能和稳定性的最重要目标之一。 这样的健康评估通常仅关注主要收集的频率&#xff08;避免&#xff09;或检测内存泄漏的存在。 年轻一代空间或短寿命物体的大小和足迹如何&#xff1f; 本文…

小程序绘图工具painter-json文件绘制保存分享图-可点击任意元素触发函数

Painter是由酷家乐移动前端团队打造的一款小程序绘图组件。 原项目地址&#xff1a;https://github.com/Kujiale-Mobile/Painter 新版地址&#xff1a;https://github.com/shesw/Painter 这款交互版原来是为了针对业务中的新需求而由我自己开发的&#xff0c;后来需求改动&a…

4 张动图解释为什么(什么时候)使用 Redux

dev-reading/fe 是一个阅读、导读、速读的 repo&#xff0c;不要依赖于 dev-reading/fe 学习知识。本 repo 只是一个快速了解文章内容的工具&#xff0c;并不提供全文解读和翻译。你可以通过本平台快速了解文章里面的内容&#xff0c;找到感兴趣的文章&#xff0c;然后去阅读全…

您正在使用什么垃圾收集器?

我们的研究实验室正全速前进。 随着最近的资金注入 &#xff0c;我们只能保证我们不断创新的步伐只会加快。 我们进行的部分研究与GC优化有关。 在处理这个有趣领域中的问题时&#xff0c;我们认为可以分享一些有关GC算法使用的见解。 为此&#xff0c;我们对使用特定GC算法的…

前端布局推进剂 - 间距规范化

我是一个爱折腾设计的前端&#xff0c;一直都在标榜自己的页面还原是多么的牛 X 。怎么做到页面还原&#xff1f;我有一个最笨但是有效的方法&#xff0c;就是把设计稿直接存成图片&#xff0c;作为背景图然后临摹着设计稿进行开发。我觉得自己太有才了。像素级还原有没有&…

echarts折线图相关

optionJKDLine {  title: {text: 告警数量趋势图,textStyle:{  //标题样式fontStyle:normal,fontFamily:sans-serif,fontSize:12    }},tooltip: {trigger: axis},legend: {  //图例,默认显示},grid: {  //图表距离left: -3%,right: 5%,bottom: 3%,top:20%,contai…

一个关于fixed抖动的小bug

前言 大家都知道position: fixed用于生成绝对定位的元素&#xff0c;相对于浏览器窗口进行定位。 元素的位置通过 "left", "top", "right" 以及 "bottom" 属性进行规定。 突然发现自己之前写的网页有个小bug&#xff1a;在购买页面的…

腾讯Node.js基础设施TSW正式开源

经过六年的迭代与沉淀&#xff0c;腾讯Tencent Server Web (以下简称TSW)这一公司级运维组件于今日正式开源。TSW是面向WEB前端开发者&#xff0c;以提升问题定位效率为初衷&#xff0c;提供云抓包、全息日志和异常发现的Node.js基础设施。TSW每天为百亿次请求提供稳定服务&…

ORM框架greenDao 2 (用于了解旧版本的使用方法,目前最新版本为3.2.2,使用注释的方式来生成)...

摘要&#xff1a; Android中对SQLite数据库使用&#xff0c;是一件非常频繁的事情。现今&#xff0c;也有非常多的SQLite处理的开源框架&#xff0c;其中最著名的greenDao&#xff0c;它以占用资源少&#xff0c;处理效率高等特点&#xff0c;成为优秀的ORM框架之一。那么对于g…

配置MySQL以进行ADF开发

大家好。 今天&#xff0c;我将向您展示如何为Oracle ADF开发配置MySQL数据库。 恕我直言&#xff0c;当您将ADF与其他数据库而不是Oracle DB一起使用时&#xff0c;您将无法使用Oracle ADF的全部功能&#xff0c;有时您会发现自己正在寻找解决方法&#xff0c;以实现某些行为…

React Native面试知识点

本文原创首发于公众号&#xff1a;ReactNative开发圈&#xff0c;转载需注明出处。 本文会不定期不断更新&#xff0c;想查看最新版本请移步至https://github.com/forrest23/react-native-interview 1.React Native相对于原生的ios和Android有哪些优势&#xff1f; 1.性能媲美…

KIE-WB / JBPM控制台Ng –配置

大家好&#xff0c;这是我上一篇文章中有关如何使用jBPM Console的后续文章 。 这篇文章的主要思想是描述为了在您自己的公司中使用它&#xff0c;您需要对jBPM Console NG进行一些最常见的配置。 但是在讨论技术细节之前&#xff0c;我们将介绍KIE Workbench&#xff08;KIE-W…

自己写一个H5项目CI系统

持续集成&#xff08;Continuous integration&#xff0c;简称CI)系统在软件自动化构建&#xff08;包括编译、发布、自动化测试&#xff09;方面有着重要的作用&#xff0c;在之前&#xff0c;前端项目简单&#xff0c;很多时候发布都只是一些简单的拷贝&#xff0c;而随着web…