Luigi任务调度框架学习2:运行每一个Task,避免因判定完成导致跳过执行Task主程序

在上一篇Luigi的线性调度文章中(Luigi任务调度框架学习1:线性调用流程),我们知道Task运行的时候:

每个任务是否完成有两次判定,即:进行判定(未完成) =》运行def run(self)函数 =》进行判定(完成) =》运行后续的Task;如果第一次判定就完成,则不会执行当前Task的def run(self)函数

但是Luigi支持的判定条件只有文件与SQL,在有些情况下(尤其是定时任务),我们希望它梳理并运行整个拓扑,而不是根据判定去决定是否运行,因此本文来解决这个问题,让我们能够直接运行Task

文章目录

  • 解决方法
  • 问题解析
    • 1. Luigi多Task多次运行效果
    • 2. 无限调用Task示例程序

解决方法

博主写了一个工具类:

import luigi
import osclass NeverStopLuigi(luigi.Task):"""无限执行Task的工具类"""never_stop = luigi.BoolParameter(True)def __init__(self, *args, **kwargs):super().__init__(*args, **kwargs)self._out_put_file = self.__class__.__name__  # 用当前类名,作为无限重启确认的文件名if self.never_stop:outputs = luigi.task.flatten(self.output())for out in outputs:if isinstance(out, luigi.LocalTarget) and out.exists():os.remove(self.output().path)def run(self):with open(self._out_put_file, "w") as f:f.write(".")def output(self):return luigi.LocalTarget(self._out_put_file)

之后程序只需要继承NeverStopLuigi类后覆写两个函数(def requires(self)def run(self))即可运行:

class MyTask(NeverStopLuigi):def requires(self):  # 1. 用于定义前置程序print(f"运行MyTask的 requires程序")return []def run(self):  # 2. 这个Task运行的主程序super(MyTask, self).run() # 调用NeverStopLuigi的run函数print(f"运行MyTask的 ======= 主程序 ======")

在这个工具类中,主要内容有:

  1. 依靠初始化Task时删除掉检测的文件,来实现首次检测output不通过
  2. 调用run函数先写文件,实现二次检测output通过

至于用于检测的文件名,用最不容易重名的类名self.__class__.__name__来作为中间的文件名,这里是为了文件能够循环执行,而不用uuid这种随意生成的文件,最后无法删除(其实是可以在最后一个Task通过获取前面的所有output删除的,但没有直接继承的写法简洁明了)

问题解析

1. Luigi多Task多次运行效果

import luigiclass PreClass(luigi.Task):out_file = "pre_class.text"def requires(self):  # 所需的前置函数,也可以不写,默认为空print(f"运行PreClass的 requires程序")return []def output(self):  # 完成的依据print(f"运行PreClass的 output程序")return luigi.LocalTarget(self.out_file)  # 判断这一部分是否完成会去查看是否有out_file文件(pre_class.text)def run(self):  # 这一部分执行的主函数print(f"运行PreClass的 ======= 主程序 ======")with open(self.out_file, "a") as file:file.write("啊哈哈哈")class Run1Class(luigi.Task):out_file = "run_1_class.text"# 这里不覆写 output是默认空def requires(self):  # 所需的前置函数print(f"运行Run1Class的 requires程序")return [PreClass()]def output(self):  # 完成的依据print(f"运行Run1Class的 output程序")return luigi.LocalTarget(self.out_file)  # 判断这一部分是否完成会去查看是否有out_file文件(pre_class.text)def run(self):  # 这一部分执行的主函数print(f"运行Run1Class ======= 主程序 ======")with open(self.out_file, "a") as file:file.write("啊哈哈哈")class Run2Class(luigi.Task):out_file = "run_2_class.text"# 这里不覆写 output是默认空def requires(self):  # 所需的前置函数print(f"运行Run2Class的 requires程序")return [Run1Class()]def run(self):  # 这一部分执行的主函数print(f"运行Run2Class ======= 主程序 ======")with open(self.out_file, "a") as file:file.write("啊哈哈哈")if __name__ == '__main__':luigi.build([Run2Class()], local_scheduler=True)  # 只写入最后输入的数据即可

最后的输出效果如下:

===== Luigi Execution Summary =====Scheduled 3 tasks of which:
* 3 ran successfully:- 1 PreClass()- 1 Run1Class()- 1 Run2Class()This progress looks :) because there were no failed tasks or missing dependencies===== Luigi Execution Summary =====运行Run2Class的 requires程序
运行Run1Class的 output程序
运行Run1Class的 requires程序
运行PreClass的 output程序
运行PreClass的 requires程序
运行PreClass的 requires程序
运行PreClass的 ======= 主程序 ======
运行Run1Class的 requires程序
运行PreClass的 output程序
运行Run1Class ======= 主程序 ======
运行Run2Class的 requires程序
运行Run1Class的 output程序
运行Run2Class ======= 主程序 ======

调度顺序如上所示

2. 无限调用Task示例程序

import luigi
import osclass NeverStopLuigi(luigi.Task):"""工具类"""never_stop = luigi.BoolParameter(True)def __init__(self, *args, **kwargs):super().__init__(*args, **kwargs)self._out_put_file = self.__class__.__name__  # 用当前类名,作为无限重启确认的文件名if self.never_stop:outputs = luigi.task.flatten(self.output())for out in outputs:if isinstance(out, luigi.LocalTarget) and out.exists():os.remove(self.output().path)def run(self):with open(self._out_put_file, "w") as f:f.write(".")def output(self):return luigi.LocalTarget(self._out_put_file)class PreClass(NeverStopLuigi):def requires(self):  # 所需的前置函数,也可以不写,默认为空print(f"运行PreClass的 requires程序")return []def run(self):  # 这一部分执行的主函数super(PreClass, self).run()print(f"运行PreClass的 ======= 主程序 ======")class Run1Class(NeverStopLuigi):def requires(self):  # 所需的前置函数print(f"运行Run1Class的 requires程序")return [PreClass()]def run(self):  # 这一部分执行的主函数super(Run1Class, self).run()print(f"运行Run1Class ======= 主程序 ======")class Run2Class(NeverStopLuigi):def requires(self):  # 所需的前置函数print(f"运行Run2Class的 requires程序")return [Run1Class()]def run(self):  # 这一部分执行的主函数super(Run2Class, self).run()print(f"运行Run2Class ======= 主程序 ======")if __name__ == '__main__':luigi.build([Run2Class()], local_scheduler=True)  # 只写入最后输入的数据即可

如上程序,不论运行多少次都会得到:

运行Run2Class的 requires程序
运行Run1Class的 requires程序
运行PreClass的 requires程序
运行PreClass的 requires程序
运行PreClass的 ======= 主程序 ======
运行Run1Class的 requires程序
运行Run1Class ======= 主程序 ======
运行Run2Class的 requires程序
运行Run2Class ======= 主程序 ======

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/780135.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

国产暴雨AI服务器X3418开启多元自主可控新篇章

在当前数字化转型的大潮中,算力作为新质生产力的重要动力引擎,对推动经济社会发展起着关键作用。尤其在人工智能领域,随着高性能、安全可控的AI算力需求持续攀升,国产化服务器的研发与应用显得尤为迫切。 作为国内专业的算力基础…

mysql5.7 源码分析--初始化

集中在sql\mysqld.cc文件的mysqld_main函数中(): 主程序入口 在sql\main.cc文件中: int main(int argc, char **argv) {return mysqld_main(arg, argv); } 一、mysql为了跨平台,对win32系统做了单独的初始化&#x…

【ENSP】交换机和路由器之间实现静态路由

1.概念 三层交换机只能在Vlanif逻辑口配置iP地址 路由器的每一个端口都是另外一个网段 2.实现方法 交换机允许对应vlan通行,配置vlanif的ip地址,做静态路由 路由器,进接口配置对应的ip,路由器和交换机相连的口,进入他的…

文心一言 VS 讯飞星火 VS chatgpt (225)-- 算法导论16.3 7题

七、推广赫夫曼算法,使之能生成三进制的码字(即码字由符号0、1、2组成),并证明你的算法能生成最优三进制码。如果要写代码,请用go语言。 文心一言: 推广赫夫曼算法以生成三进制码字需要对算法进行一定的修改,确保在每…

JavaScript是如何实现页面渲染的

JavaScript实现页面渲染主要涉及到对DOM的操作、样式的修改以及与后端数据的交互。以下是JavaScript实现页面渲染的主要步骤和方式: 一、DOM操作 创建和修改元素:JavaScript可以使用document.createElement()来创建新的DOM元素,使用appendC…

力软框架打开新的对话框,点击对话框确认按钮的事件AcceptClick的方法

// 原来在力软框架下,点击哪个确认按钮的时候 top.frames[iframeId].AcceptClick直接用这个方法就可以了 ,那个方法是直接返回方法的但是不知道是什么情况。如图二所示。死活就返回了ifram标签不知道是什么原因,就获取不到对话框里边自己定义…

求交错数列前n项和:输⼊⼀个正整数n,计算交错序列1-2/3+3/5-4/7+5/9-6/11+... 的前n项之和。试编写相应程序。

#include <stdio.h> #include <string.h> int main() { // 求交错数列前n项和&#xff1a;输个正整数n&#xff0c;计算交错序列1-2/33/5-4/75/9-6/11... 的前n // 项之和。试编写相应程序。 int sign 1; float sum 0; int n; scanf("…

nginx界面管理工具之nginxWebUI 搭建与使用

nginx界面管理工具之nginxWebUI 搭建与使用 一、nginxWebUI 1.nginx网页配置工具 官网地址: http://www.nginxwebui.cn 源码地址&#xff1a;https://git.chihiro.org.cn/chihiro/nginxWebUI 2.功能说明 本项目可以使用WebUI配置nginx的各项功能, 包括http协议转发, tcp协议…

springboot+vue+elementui保存时间类型数据报错JSON parse error

1.目前环境条件&#xff1a; ①mysql数据库中存储的时间字段类型为&#xff1a;datetime ②&#xff1a;springboot中存储的类型为&#xff1a;LocalDateTime ③前端代码&#xff1a; <el-col :span"24"><el-form-item><div slot"label"…

帆软报表踩坑日记

最近公司项目要是使用报表&#xff0c;公司使用的是帆软这个国产软件&#xff0c;自己也是学习使用&#xff0c;在使用的过程中记一下问题以及解决方式 公司使用的是帆软8这个版本&#xff0c;比较老了。 首先是表格中的扩展&#xff0c;就是当我们根据数据库查询数据然后放到表…

PageHelper分页错乱

PageHelper.startPage(qryOrderDetailParam.getPageNum(), qryOrderDetailParam.getPageSize()); 在使用分页插件时&#xff0c;如果sql的最后面跟着类似这个的sql OFFSET 0 ROWS FETCH NEXT #{taskNum} ROWS ONLY, 就是自己本身也有限制条数的逻辑&#xff1b; 双重限制逻辑…

【Vue3源码学习】— CH2.5 reactiveEffect.ts:Vue 3响应式系统的核心

reactiveEffect.ts&#xff1a;Vue 3响应式系统的核心 1. 什么是 reactiveEffect&#xff1f;2. 核心机制2.1 依赖收集&#xff08;Track&#xff09;2.2 触发更新&#xff08;Trigger&#xff09;2.3 效果范围&#xff08;effectScope&#xff09; 3. 源码解析 —— track3.1 …

云服务器16核64G租用优惠价格500元1个月、5168元一年,35M带宽

京东云16核64G服务器租用价格500元1个月、1500元3个月、2585元6个月、5168元一年&#xff0c;配置为16C64G-450G SSD系统盘-35M带宽-8000G月流量 华北-北京。京东云16核64G服务器优惠活动 atengyun.com/go/jd 链接打开如下图&#xff1a; 京东云16核64G服务器租用价格 京东云&a…

[超细] npm 版本号规范升级流程

版本号组成 node package版本号由四部分组成&#xff1a; major.minor.patch[-prerelease] 比如&#xff1a;1.0.2-beta.1&#xff0c;其中prerelease可选。 ● major&#xff1a;代表主版本号&#xff0c;通常在需要提交不能向下兼容的情况下对该版本号进行升级 ● minor&…

SD 修复 Midjourney 有瑕疵照片

Midjourney V6 生成的照片在质感上有了一个巨大的提升。下面4张图就是 Midjourney V6 生成的。 如果仔细观察人物和老虎的面部&#xff0c;细节真的很丰富。 但仔细观察上面四张图的手部细节&#xff0c;就会发现至少有两只手是有问题的。这也是目前所有 AI 绘图工具面临的问题…

【Roadmap to learn LLM】Large Language Models in Five Formulas

by Alexander Rush Our hope: reasoning about LLMs Our Issue 文章目录 Perpexity(Generation)Attention(Memory)GEMM(Efficiency)用矩阵乘法说明GPU的工作原理 Chinchilla(Scaling)RASP(Reasoning)结论参考资料 the five formulas perpexity —— generationattention —— m…

stm32定时器中断函数回调函数

方式一&#xff1a;stm32定时器中断可以直接在硬件中断函数TIM3_IRQHandler执行。 在HAL库中可以注册回调函数&#xff0c;在定时器中断发生时调用注册的函数&#xff0c;这样可以统一接口&#xff0c;大大提高函数可读性&#xff0c;和硬件解耦提高程序可移植性。 使用过程如…

Linux 开发环境以及编译链接

再谈编译链接 C函数重载与编译链接-CSDN博客 之前我已经写过文章简单介绍了编译链接要做的一些操作。现在为了能更好的理解我们平时的开发环境&#xff0c;我会在Linux系统上完整地走一遍流程。 环境描述 我们使用普通用户在Linux上进行操作&#xff0c;先写一段测试代码。 …

vue中使用图片url直接下载图片

vue中使用图片url直接下载图片 // 下载图片downloadByBlob(url, name) {let image new Image()image.setAttribute(crossOrigin, anonymous)image.src urlimage.onload () > {let canvas document.createElement(canvas)canvas.width image.widthcanvas.height image…

Django屏蔽Server响应头信息

一、背景 最近我们被安全部门的漏洞扫描工具扫出了一个服务端口的漏洞。这个服务本身是一个Django启动的web服务&#xff0c;并且除了登录页面&#xff0c;其它页面或者接口都需要进行登录授权才能进行访问。 漏洞扫描信息和提示修复信息如下: 自然这些漏洞如何修复&#xff0c…