Python解决多个进程服务重复运行定时任务的问题

记录多实例服务定时任务出现运行多次的问题

问题:web项目运行多个实例时,定时任务会被执行多次的问题

举例来说
我使用库APScheduler排定了一个定时任务taskA在每天的晚上9点需要执行一次,我的web服务使用分布式运行了8个实例,于是在每天晚上的9点,我的8个服务都会接收到APScheduler发布的这个定时任务,然后每个服务都会执行一次这个定时任务,这与我预期的只执行1次是不符的,我想要解决这个问题,让定时任务只被执行一次

设想的解决方式:
使用redis分布式锁的方式(redis.setnx(key, value))让实例只能运行一次定时任务,从而解决定时任务会被执行多次的问题

setnx(key, value)方法很简单,就是当key不存在时,setnx会成功,否则会失败

    def setnx(self, name: KeyT, value: EncodableT) -> Awaitable:"""Set the value of key ``name`` to ``value`` if key doesn't exist"""return self.execute_command("SETNX", name, value)

实现方案

  1. setnx(key, value)方法会在key不存在时设置value,当多个线程同时接到排期准备运行同一个任务时,只有第一个线程setnx会成功(返回True),于是第一个setnx成功的线程运行了定时任务,其他线程在setnx时由于key已经存在会失败(返回False),从而让它们跳过定时任务的执行

  2. 仍然存在的问题:定时任务一般会执行多次,在其下一次执行时,setnx相同key的这条记录应该被删除掉,因为这是一次新的任务,否则之后的任务执行都会因setnx时key已存在而失败导致任务无法执行

    i. 第一种方案:在setnx成功的线程1任务执行完成后删除这个key在redis中存储的记录,从而让下一次任务第一次运行时又可以成功setnx(key, value)而执行
    但这种方案存在一定的风险:如果存在线程2因为一些原因阻塞了,在线程1执行完任务才开始接收到运行定时任务的指令,那么线程2会在key被删除后开始尝试setnx,那必然会成功,然后重复了运行任务

    ii. 基于第一种方案的考虑,确定了第二种方案,只需要给每次的定时任务添加唯一标识即可避免第一种方案的问题:设置此次任务运行的唯一key_x,在setnx成功的线程1任务执行完成之后不对这次定时任务的key_x执行删除
    此次定时任务唯一key_x的设置很容易想到的方案是在这次定时任务id上添加运行的排期时间,这样就可以让这一次的定时任务是唯一且可识别了,只要运行了一次其值就永久设置为True,不会在执行第二次(考虑到资源占用,实际应该设置一个较长的过期时间也完全可以避免方案1的风险)

设置有过期时间的方法应该使用redis.set(key, value, nx=True, ex=10)方法,这里nx=True表名使用命令SETNX,而ex=10则是过期的时间,单位为秒

第一种方案的可重复运行的小案例:

# -*- coding: utf-8 -*-
import asyncio
import timeimport aioredis
from aioredis import Redisloop = asyncio.get_event_loop()
redis_coro = aioredis.Redis()def redis_distributed_lock(cache_key, cache_value="locked"):def decorator(func):async def wrapper(*args, **kwargs):redis_instance = await redis_coro# 这里设置了10小时的过期时间,完全可以避免重复运行的风险了locked = await redis_instance.set(cache_key, cache_value, nx=True, ex=60 * 60 * 10)if locked:  # 第一个线程设置成功值会运行任务,否则不会运行任务print("success")return await func(*args, **kwargs)print(f"failed")return wrapperreturn decoratorasync def ntasks():t_time = ["9点", "10点", "11点"]   # 模拟任务在三个时间点被执行redis: Redis = await redis_corot_id = 1async def task_func(tid):print(f"{tid=}, executing...")return tid# 为了可重复运行这个示例,先执行删除之前设置的keyret = await redis.delete(str(t_id))for t_t in t_time:redis_key = f"{t_id}"    # 这里本来预期是直接放到函数头上装饰,但是不方便控制redis_key参数,所以使用了原始的方式装饰task_functask_f = redis_distributed_lock(redis_key)(task_func)# 假设启动分布式服务8个,会执行8次定时任务,这里创建了8个任务,按照先执行完先返回的顺序处理for f in asyncio.as_completed([task_f(redis_key) for _ in range(8)], loop=loop):res = await f# print(f"{res=}")print(f"=" * 80)time.sleep(5)  # 模拟一个定时任务在多个时间点执行,下一次执行时,时间参数(t_t)会发生变化# breakif __name__ == '__main__':try:loop.run_until_complete(ntasks())finally:loop.stop()loop.close()

运行结果

success
tid='1', executing...
failed
failed
failed
failed
failed
failed
failed
================================================================================
failed
failed
failed
failed
failed
failed
failed
failed
================================================================================
failed
failed
failed
failed
failed
failed

只有第一个时间点是按照预期执行,之后的时间点执行都总是失败,因为每个时间点的该任务设置的key都是一样的

第二种方案的可重复运行的小案例:

# -*- coding: utf-8 -*-
import asyncio
import timeimport aioredis
from aioredis import Redisloop = asyncio.get_event_loop()
redis_coro = aioredis.Redis()def redis_distributed_lock(cache_key, cache_value="locked"):def decorator(func):async def wrapper(*args, **kwargs):redis_instance = await redis_coro# 这里设置了10小时的过期时间,完全可以避免重复运行的风险了locked = await redis_instance.set(cache_key, cache_value, nx=True, ex=60 * 60 * 10)if locked:  # 第一个线程设置成功值会运行任务,否则不会运行任务print("success")return await func(*args, **kwargs)print(f"failed")return wrapperreturn decoratorasync def ntasks():t_time = ["9点", "10点", "11点"]   # 模拟任务在三个时间点被执行redis: Redis = await redis_corot_id = 1async def task_func(tid):print(f"{tid=}, executing...")return tidfor t_t in t_time:redis_key = f"{t_id}_{t_t}"     # set的key用定时任务的id+时间点来作为此次定时任务的唯一标识# 为了可重复运行这个示例,先执行删除之前设置的keyret = await redis.delete(redis_key)print(f"key deleted? {ret}")# 这里本来预期是直接放到函数头上装饰,但是不方便控制redis_key参数,所以使用了原始的方式装饰task_functask_f = redis_distributed_lock(redis_key)(task_func)# 假设启动分布式服务8个,会执行8次定时任务,这里创建了8个任务,按照先执行完先返回的顺序处理for f in asyncio.as_completed([task_f(redis_key) for _ in range(8)], loop=loop):res = await f# print(f"{res=}")print(f"=" * 80)time.sleep(5)  # 模拟一个定时任务在多个时间点执行,下一次执行时,时间参数(t_t)会发生变化# breakif __name__ == '__main__':try:loop.run_until_complete(ntasks())finally:loop.stop()loop.close()

输出

key deleted? 1
success
tid='1_9点', executing...
failed
failed
failed
failed
failed
failed
failed
================================================================================
key deleted? 1
success
tid='1_10点', executing...
failed
failed
failed
failed
failed
failed
failed
================================================================================
key deleted? 1
success
tid='1_11点', executing...
failed
failed
failed
failed
failed
failed
failed
================================================================================

可以看到task_func任务的每个时间点的执行都只有一次成功,而且不会出现只有第一个时间点执行成功而之后的时间点执行都全是失败的情况

有用的参考:
起初总是尝试在协程方法中使用多线程threading.Thread老是碰壁,看了这个回答后读了这篇文章,感觉豁然开朗

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/469386.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java----IO和NIO的区别

概念:NIO即New IO,这个库是在JDK1.4中才引入的。NIO和IO有相同的作用和目的,但实现方式不同,NIO主要用到的是块,所以NIO的效率要比IO高很多。在Java API中提供了两套NIO,一套是针对标准输入输出NIO&#xf…

【Pytorch神经网络理论篇】 11 卷积网络模型+Sobel算子原理

同学你好!本文章于2021年末编写,已与实际存在较大的偏差! 故在2022年末对本系列进行填充与更新,欢迎大家订阅最新的专栏,获取基于Pytorch1.10版本的理论代码(2023版)实现, Pytorch深度学习理论篇(2023版)…

ubuntu 14.04中文显示乱码问题

乱码显示如下&#xff1a; [<E9><97><AE><E9><A2><98><E6><8F><8F><E8><BF><B0>]:<E5><A2><9E><E5><8A><A0>tm100<E9><A1><B9><E7><…

DataFrame高效处理行列数据/倒三角型数据/处理阶梯型数据/根据列的值确定行的值

锲子 在使用pandas处理数据时&#xff0c;遇到了一种要按照留存天数来处理的数据&#xff0c;当列所对应的日期超过了最晚的“今天”那么数据就要置为0&#xff0c;举个例子&#xff1a; 在这个DataFrame中&#xff0c;需要将超过了今天2022-10-30的数据置为“-”&#xff0c…

转载 ---资深HR告诉你:我如何筛选简历与选择人员的

资深HR告诉你&#xff1a;我如何筛选简历与选择人员的 有个公司HR看简历 先直接丢掉一半 理由是不要运气不好的应聘者。 当然这可能只是某些HR面对太多的简历产生了偷懒的情绪&#xff0c;但是不论是Manager&#xff0c;亦或是Team Leader&#xff0c;都会遇到招聘的问题&#…

爬虫实战学习笔记_3 网络请求urllib模块:设置IP代理+处理请求异常+解析URL+解码+编码+组合URL+URL连接

1 设置IP代理 1.1 方法论述 使用urllib模块设置代理IP是比较简单的&#xff0c;首先需要创建ProxyHandler对象&#xff0c;其参数为字典类型的代理IP&#xff0c;键名为协议类型&#xff08;如HTTP或者HTTPS)&#xff0c;值为代理链接。然后利用ProxyHandler对象与buildopene…

vim 插件cscope 使用

&#xff11;&#xff0e;安装 sudo apt-get install cscope &#xff12;&#xff0e;初始化 cscope -Rbq 你想在哪个目录下面用这个功能&#xff0c;就在哪个目录下面运行这个命令 &#xff13;&#xff0e;vim kpd.c &#xff14;&#xff0e;输入 :cs add cscope.o…

Sanic服务启动失败,报错Cannot finalize with no routes defined

Sanic服务启动失败&#xff0c;记录解决方法 问题描述 Sanic服务启动失败&#xff0c;同样的代码和python版本在之前的win10系统上运行的好好的&#xff0c;换了台win11的机器就跑不起来了&#xff0c;不知道是系统原因还是因为换了执行pycharm等其他原因 在尝试启动时总是会…

打一波鸡血

在朋友圈看的一首诗&#xff0c;觉得很励志&#xff0c;如下&#xff1a;问题在于过度担心未来总是埋怨现在不断惋惜过往内心不知足想法逾现实行动很迟缓时间可贵青春难再审视脚下的路充实度日一日尚短庸碌混迹一日便长无妄的借口只会难上加难脚踏实地走才能遇难成祥守住时光用…

【Pytorch神经网络理论篇】 12 卷积神经网络实现+卷积计算的图解

同学你好&#xff01;本文章于2021年末编写&#xff0c;获得广泛的好评&#xff01; 故在2022年末对本系列进行填充与更新&#xff0c;欢迎大家订阅最新的专栏&#xff0c;获取基于Pytorch1.10版本的理论代码(2023版)实现&#xff0c; Pytorch深度学习理论篇(2023版)目录地址…

vim 函数列表插件

&#xff11;&#xff0e;apt-get install exuberant-ctags &#xff12;&#xff0e;unzip taglist_42.zip  把解压出来的文件放到 /home/weiqifa(自己的用户名)/.vim/  资源下载&#xff1a; http://download.csdn.net/detail/weiqifa0/9137283&#xff13;&#xff0e;s…

【Pytorch神经网络实战案例】09 使用卷积提取图片的轮廓信息(手动模拟Sobel算子)

1 载入图片并显示 import matplotlib.pyplot as plt import matplotlib.image as mpimg import torch import torchvision.transforms as transforms import os os.environ["KMP_DUPLICATE_LIB_OK"]"TRUE" ### 1 载入图片并显示 myimg mpimg.imread(img.…

jquery深入学习

的转载于:https://www.cnblogs.com/lizhiwei8/p/6417798.html

【Pytorch神经网络理论篇】 13 深层卷积神经网络介绍+池化操作+深层卷积神经网络实战

同学你好&#xff01;本文章于2021年末编写&#xff0c;获得广泛的好评&#xff01; 故在2022年末对本系列进行填充与更新&#xff0c;欢迎大家订阅最新的专栏&#xff0c;获取基于Pytorch1.10版本的理论代码(2023版)实现&#xff0c; Pytorch深度学习理论篇(2023版)目录地址…

gulp 系统教程

移步: http://www.cnblogs.com/2050/p/4198792.html gulp外挂 加md5 版本号 自动合并图片并修改css的坐标 一个系统列子 gulp可以做哪些事情转载于:https://www.cnblogs.com/dhsz/p/6419383.html

Android 广播接收

问题&#xff1a;写了一个应用来验证我按下音量键时发送出来的广播&#xff0c;可是老是提示出错&#xff0c; 原因&#xff1a;我的整个应用只继承吧BroadcastReceive&#xff0c;没有实际的继承Activity所以总是出现各种问题&#xff0c;现在修改好&#xff0c;上传代码 http…

【Pytorch神经网络实战案例】10 搭建深度卷积神经网络

识别黑白图中的服装图案(Fashion-MNIST)https://blog.csdn.net/qq_39237205/article/details/123379997基于上述代码修改模型的组成 1 修改myConNet模型 1.1.1 修改阐述 将模型中的两个全连接层&#xff0c;变为全局平均池化层。 1.1.2 修改结果 ### 1.5 定义模型类 class m…

Service Worker,Web Worker,WebSocket的对比

Service Worker 处理网络请求的后台服务。适用于离线和后台同步数据或推送信息。不能直接和dom交互。通过postMessage方法交互。 Web Worker 模拟多线程&#xff0c;允许复杂计算功能的脚本在后台运行而不会阻碍到其他脚本的运行。适用于处理器占用量大而又不阻碍的情形。不能直…

MTK 升级USB问题

问题&#xff1a;我们的开发环境是ubuntu里面安装xp ,经常是xp下没有正常识别preload模式下的usb.这样肯定不能升级不了。 设置&#xff1a;MTK preload下的USB vid:0e8d pid:2000 revion 0100 知道这几个值了&#xff0c;在usb配置里面增加这个筛选项就可以了。

JAVA 8 StreamAPI 和 lambda表达式 总结(一)--lambda表达式

这些天看见另一本好书《给大忙人看的Java SE 8》&#xff0c;其中的新特性 StreamAPI 和 lambda表达式 是之前jdk没有提供的新特性&#xff0c;也是jdk8 重要的更新内容&#xff0c;我会总结一下它们的用法&#xff0c;更详细的参见书本。 lambda表达式的概念 人对一个概念的理…