Python 全栈系列246 任务调度对象WFlaskAPS

说明

之前已经完全跑通了任务调度,实现了S2S的流转Python 全栈系列243 S2S flask_celery。由于request请求用起来比较别扭,所以创建一个对象来进行便捷操作。

内容

1 功能

WFlaskAPS包含管理定时任务的必要功能

from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel
import requests as req class WFlaskAPS(BaseModel):flask_aps_agent:str = 'IP:Port'# 获取当前的任务列表def get_jobs(self):url = 'http://%s/get_jobs' % self.flask_aps_agentreturn req.get(url).json()# 删除某个任务def remove_a_task(self, task_name = None):url = 'http://%s/remove_a_task/' % self.flask_aps_agentdata_dict ={}data_dict['task_id'] = task_namereturn req.post(url, json=data_dict).json()# 获取任务状态def get_jobs_status(self):url = 'http://%s/get_jobs_status/' % self.flask_aps_agentreturn req.get(url).json()# 发布一个任务(task)# task_name 其实是job type# task_id 任务的唯一编号# task_type 原来的服务只实现了cron方式,因为cron方式可以实现其他两种方式。'''date:在指定的日期和时间运行一次interval:在指定时间间隔内运行cron:使用Cron表达式运行'''def publish_a_task(self,task_id= None ,task_name = None, task_type ='cron', task_kwargs = {},year = None, month = None, day = None, week = None, day_of_week = None, hour = None,minute = None, second = None, start_date = None, end_date = None,):url = 'http://%s/publish_a_task/' % self.flask_aps_agentdata_dict = {'task_id':task_id,'task_name':task_name,'task_type':task_type,'task_kwargs':task_kwargs,'year':year,'month':month,'day':day,'week':week,'day_of_week':day_of_week,'hour':hour,'minute':minute,'second':second,'start_date':start_date,'end_date':end_date}return req.post(url, json=data_dict).json()def pause_a_task(self, task_id = None):data_dict = {}url =  'http://%s/pause_a_task/' % self.flask_aps_agentdata_dict['task_id'] = task_idreturn req.post(url, json=data_dict).json()def resume_a_task(self, task_id = None):data_dict = {}data_dict['task_id'] = task_idurl =  'http://%s/resume_a_task/' % self.flask_aps_agentreturn req.post(url, json=data_dict).json()# task_namedef add_a_job(self, fpath = None, func_name = None):url = 'http://%s/add_task_type/' % self.flask_aps_agentdata = {}data['func_name'] = func_namewith open(fpath, 'r') as f: data['func_body'] = f.read()return req.post(url, json=data).json()

从逻辑上,首先需要创建任务类型(add_a_job),之后就可以依据这个job发布n个task,发布任务时的参数是最复杂的。原始的flask apscheduler其实提供了三种触发类型:

  • 1 date 一次性触发
  • 2 interval 周期触发
  • 3 cron 触发

由于cron触发可以涵盖前两种的变化,所以在之前的服务中,只创建了cron格式的定时任务。与linux系统里的cron不同,系统里最小的周期是分钟,而这里是秒。

默认情况下,周期任务是每秒执行的。如果需要改为n秒执行,可以用 '*/5’的方式,指定重复执行的周期。通过start_date和end_date可以钳制任务的周期跨度(所以当然也包含了一次性的任务)。

使用方法比直接请求接口简洁多了

from Basefuncs import * 
# flask_aps_agent = 'IP:PORT'
wf = WFlaskAPS()# 1 获取任务列表
wf.get_jobs()
# 2 删除一个任务
wf.remove_a_task('my_test1')
# 3 获取任务状态
wf.get_jobs_status()
# 4 发布一个任务
cur_dt_str = get_time_str1()
wf.publish_a_task(task_id ='my_test1', task_name= 'hello',start_date=cur_dt_str,second ='*/5')
# 5 暂停一个任务
wf.pause_a_task(task_id = 'my_test1')
# 6 恢复任务
wf.resume_a_task(task_id = 'my_test1')# 7 增加一个job(task_name)
# wf.add_a_job()

2 Next

2.1 flask_aps_job_table

创建一张表,里面存储了job的元信息,表可以存在mymeta.flask_aps下面。

字段解释
job_namejob 名称
description描述
para_dict参数样例

2.2 flask_aps_task_table

字段解释
machine机器名, m1,m2
task_id任务id
job_name任务类型
set_to_status被设定的状态
running_status当前状态
start_dt开始时间
end_dt结束时间
interval_params周期时间

机器名是重要的,因为同样的任务可能在不同的机器上执行。

2.3 assure_tasks.py

这个脚本伴随服务的启动会执行一次:

  • 1 根据本机名称,去flask_aps_task_table寻找哪些被设定为运行的任务
  • 2 获取当前的任务列表,根据任务名,运行状态的差集执行:
    • 1 发布任务。这种情况一般是服务重启之后,运行任务丢失了。

其他类型的操作,可以后续通过前端来交互(这里又会用一下MongoEngine).

2.4 S2S Work Mode

创建一个S2S,然后让Worker根据这个来执行一些任务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/11834.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

嵌入式CAN通信协议详解分析

CAN协议简介 CAN是控制器局域网络(Controller Area Network)的简称,它是由研发和生产汽车电子产品著称的德国BOSCH公司开发的,并最终成为国际标准(ISO11519),是国际上应用最广泛的现场总线之一。 CAN总线协议已经成为汽车计算机控制系统和嵌入式工业控制局域网的标准总线…

算法课程笔记——自下而上树形DP

算法课程笔记——自下而上树形DP #include<bits/stdc.h>usingnamespacestd; constintN100005; intn,a[N]; longlongdp[N][2]; vector<int> e[N]; voiddfs(intu){for(autov:e[u]){dfs(v);dp[u][1]dp[v][0];dp[u][0]max(dp[v][0],dp[v][1]);}dp[u][1]a[u]; } intmain…

图像分类——猫狗图片

引言 亲爱的读者们&#xff0c;您是否在寻找某个特定的数据集&#xff0c;用于研究或项目实践&#xff1f;欢迎您在评论区留言&#xff0c;或者通过公众号私信告诉我&#xff0c;您想要的数据集的类型主题。小编会竭尽全力为您寻找&#xff0c;并在找到后第一时间与您分享。 …

如何用Rust获取CPU、内存、硬盘的信息?

目录 一、用Rust获取CPU、内存、硬盘的信息 二、知识点 systemstat 一、用Rust获取CPU、内存、硬盘的信息 首先&#xff0c;需要添加systemstat库到Cargo.toml文件&#xff1a; [dependencies] systemstat "0.8.2" 在Rust代码中使用它&#xff1a; extern crat…

新增柱线组合图、象限图,新增钉钉、飞书、企业微信客户端免密登录,DataEase开源数据可视化分析工具v2.6.0发布

2024年5月13日&#xff0c;人人可用的开源数据可视化分析工具DataEase正式发布v2.6.0版本。 这一版本的功能升级包括&#xff1a;图表方面&#xff0c;新增了柱线组合图、象限图&#xff1b;仪表板方面&#xff0c;支持批量拖拽字段&#xff0c;外部参数新增支持配置过滤组件&…

日元崩了:日本上市公司转战比特币作对冲

5月13日&#xff0c;日本上市公司Metaplanet发布公告宣布因为政府债务水平高企、实际利率长期处于负值以及日元贬值&#xff0c;Metaplanet特将比特币作为其公司战略储备资产&#xff0c;以回应日本严峻的经济形势。同时也为当地资产管理领域的企业创新开创了先例。 Metaplanet…

AtCoder Beginner Contest 308 B题 Default Price

B题&#xff1a;Default Price 标签&#xff1a; S T L 、 m a p STL、map STL、map、模拟 题意&#xff1a;给定 n n n个有颜色的盘子和 m m m种颜色分别对应的价值 p i p_i pi​&#xff0c; p 0 p_0 p0​表示盘子颜色如果不属于这 m m m种颜色&#xff0c;那对应的价值&…

Flutter 中的 Visibility 小部件:全面指南

Flutter 中的 Visibility 小部件&#xff1a;全面指南 在 Flutter 中&#xff0c;Visibility 是一个用于根据布尔值条件显示或隐藏小部件的控件。使用 Visibility 可以避免不必要的布局占用&#xff0c;因为它实际上会从布局树中移除符合条件的小部件。 基础用法 Visibility…

Spring AOP切面实现为mapper层指定方法入参字段赋值

需求&#xff1a; 有时候我们在进行某些操作时&#xff0c;可能需要额外进行复制操作&#xff0c;而这些字段往往不是由前端/客户端填写输入的&#xff0c;而是由后端给与&#xff0c;类似于 登陆者、创建时间、更新时间等字段&#xff0c;这时&#xff0c;可以借助AOP指定mapp…

强化学习——马尔可夫奖励过程的理解

目录 一、马尔可夫奖励过程1.回报2.价值函数 参考文献 一、马尔可夫奖励过程 在马尔可夫过程的基础上加入奖励函数 r r r 和折扣因子 γ \gamma γ&#xff0c;就可以得到马尔可夫奖励过程&#xff08;Markov reward process&#xff09;。一个马尔可夫奖励过程由 < S , …

Vue从入门到实战Day05

一、自定义指令 自定义指令&#xff1a;自己定义的指令&#xff0c;可以封装一些dom操作&#xff0c;扩展额外功能 需求&#xff1a;当页面加载时&#xff0c;让元素将获得焦点 (autofocus在safari浏览器有兼容性) 操作dom&#xff1a;dom元素.focus() mounted() {this.$ref…

安装和配置OceanBase

安装和配置OceanBase 下面是安装和配置OceanBase&#xff08;jieyiobs集群&#xff09;的步骤&#xff1a; 1. 安装YUM工具&#xff08;如果尚未安装&#xff09; yum install -y yum-utils2. 添加OceanBase仓库 为了能从OceanBase的官方源安装软件&#xff0c;需要添加它的…

(delphi11最新学习资料) Object Pascal 学习笔记---第12章操作类(类方法和类数据)

第12章 操作类 ​ 在过去的几章中&#xff0c;你已经了解了 Object Pascal 语言面向对象的基础&#xff1a;类、对象、方法、构造函数、继承、后期绑定、接口等等。现在&#xff0c;我们需要进一步了解与类管理相关的一些更高级、更具体的语言特性。从类引用到类助手(class he…

Elasticsearch解决字段膨胀问题

文章目录 背景Flattened类型的产生Flattened类型的定义基于Flattened类型插入数据更新Flattened字段并添加数据Flattened类型检索 Flattened类型的不足 背景 Elasticsearch映射如果不进行特殊设置&#xff0c;则默认为dynamic:true。dynamic:true实际上支持不加约束地动态添加…

关于milvus go sdk运行时报9223372036854775807 (untvped int constant)overflows int问题

背景 在使用milvus go sdk去查询milvus服务中并列出所有集合的时候遇到一个int溢出问题&#xff0c;依据官方文档&#xff0c;具体代码如下 package localimport ("context""fmt""github.com/milvus-io/milvus-sdk-go/v2/client""log&quo…

vue3使用依赖注入实现跨组件传值

父组件Index.vue: <script setup> import { onMounted, provide, ref } from vue import Child from ./Child.vue import ./index.cssconst count ref(0)provide(count, count)const handleClick () > {count.value }onMounted(() > {}) </script><tem…

spring boot常用的filter

OrderedCharacterEncodingFilter 由 HttpEncodingAutoConfiguration 注入 BeanConditionalOnMissingBeanpublic CharacterEncodingFilter characterEncodingFilter() {CharacterEncodingFilter filter new OrderedCharacterEncodingFilter();filter.setEncoding(this.propert…

ES6之字符串的扩展

字符串的扩展 关键的扩展点及其示例&#xff1a; Unicode 表示与处理 JavaScript 共有6种方法可以表示一个字符。codePointAtpos&#xff1a;String.fromCodePoint…codePoints&#xff1a; **字符串的遍历 for … of **字符串方法的增强 includessearchString[, position]&…

记录:robot_localization传感器数据融合学习

一、参考资料 官方&#xff1a; http://wiki.ros.org/robot_localizationhttp://docs.ros.org/en/noetic/api/robot_localization/html/index.html2015 ROSCon 演讲官方网址&#xff08;youyube上也有这个视频&#xff09; 实践教程 https://kapernikov.com/the-ros-robot_…

纯福利|手把手教你如何白嫖免费的GPU资源(二)

大家好&#xff0c;我是无界生长。 前段时间写过一篇文章《纯福利&#xff5c;手把手教你如何白嫖免费的GPU资源&#xff08;一&#xff09;》&#xff0c;使用Google Colab提供的免费的GPU资源&#xff0c;今天接着写白嫖GPU资源攻略&#xff0c;可获得“长期免费的CPU实例资源…