Flask与Celery实现Python调度服务

文章目录

  • Flask与Celery实现Python调度服务
  • 一、前言
    • 1.组件
    • 2.场景说明
    • 3.环境
  • 二、安装依赖
    • 1.安装Anaconda
    • 3.安装redis
    • 2.安装依赖包
  • 三、具体实现
    • 1.目录结构
    • 2.业务流程
    • 3.配置文件
    • 4.Celery程序
    • 5.Flask程序
    • 6.测试脚本
    • 7.程序启动
      • 1)Windows开发调试
      • 2)Linux服务器部署
      • 3)订阅通道
      • 4)测试

Flask与Celery实现Python调度服务

一、前言

1.组件

  • Flask:Flask 是一个轻量级的 Python Web 应用框架。Flask 提供了开发 Web 应用所需的基本功能,并且设计灵活,开发者可以根据需要扩展其功能。由于其核心简洁,它特别适合小型应用和微服务架构。同时,Flask 具有很高的可扩展性,通过各种扩展库可以轻松添加数据库集成、表单处理、身份验证等功能。

  • Celery:Celery 是一个异步任务队列/作业队列,基于分布式消息传递的系统。Celery 用于处理异步任务和调度定时任务,非常适合在后台处理耗时的操作,比如发送邮件、生成报告或与外部 API 交互。它通过任务队列和工作进程分离工作负载,可以提高应用程序的性能和响应速度。

  • uWSGI:uWSGI 是一个应用服务器,用于运行 Python Web 应用。uWSGI 旨在高效地服务 Python 应用,并提供了 WSGI 协议的实现。它通常用于在生产环境中部署 Python 应用。uWSGI 支持多种协议和功能,包括负载均衡、进程管理等。它与 Web 服务器(如 Nginx 或 Apache)配合使用,以提高应用的性能和可靠性。

  • Redis:Redis 是一个开源的内存数据结构存储系统,可以用作数据库、缓存和消息中间件。Redis 通过键值对的方式存储数据,支持丰富的数据结构如字符串、哈希、列表、集合和有序集合。它具有极高的读写性能,常用于缓存数据库查询结果、会话存储、队列管理等场景,以减少数据库的压力并提高系统的响应速度。Redis 还支持持久化,可以在重启后恢复数据。

2.场景说明

  • 解决在 java 项目中执行 python 脚本的问题;之前实现的方法是通过 Java 程序模拟命令行的交互环境执行 python 脚本,但是无法控制执行脚本的并发,导致服务器硬件资源占用过高的情况时常发生。

3.环境

  • Windows 版本(开发):Windows 10 专业版
  • Linux 发行版(部署):CentOS-7-x86_64-DVD-1804.iso
  • Postman for Windows Version:11.3.2
  • flask:2.2.5
  • celery:5.4.0
  • redis 客户端:5.0.4
  • redis 服务端:7.0.12
  • uwsgi:2.0.21

Postman 下载:https://www.postman.com/downloads/

flask框架入门和使用实践:https://blog.csdn.net/u011424614/article/details/112548442

Java执行Python脚本:https://blog.csdn.net/u011424614/article/details/114199102
[Windows] Anacoda安装和使用:https://blog.csdn.net/u011424614/article/details/105579502

CentOS7安装部署Anaconda:https://blog.csdn.net/u011424614/article/details/140253920

CentOS7安装部署Redis7:https://blog.csdn.net/u011424614/article/details/132418619

二、安装依赖

1.安装Anaconda

  • 安装参考:《CentOS7安装部署Anaconda》
  • 安装目录:/opt/anaconda/install

3.安装redis

  • 安装参考:《CentOS7安装部署Redis7 》
  • 安装目录:/opt/redis/redis-7.0.12/src

2.安装依赖包

# 创建环境
conda create -n dev01 python==3.10.14
# 激活环境
conda activate dev01# 安装依赖包(指定国内镜像源)
pip install flask==2.2.5 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install celery==5.4.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install redis==5.0.4 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install werkzeug==2.3.8 -i https://pypi.tuna.tsinghua.edu.cn/simple
# Windows 开发环境不需要安装,Linux 部署时使用
conda install uwsgi==2.0.21 -i https://pypi.tuna.tsinghua.edu.cn/simple

三、具体实现

1.目录结构

/opt/scheduler-service
├── logs
├── config
│   ├── redis_config.ini
│   └── uwsgi.ini
├── test
│   ├── my_script.py
├── celery_service.py
├── celery_worker_start.py
├── flask_service.py
└── flask_service_start.py

2.业务流程

执行方式:异步执行

  1. Postman 发送请求后,调用 flask 程序
  2. flask 将根据接口参数,通过 celery 异步执行指定脚本,并传递脚本所需参数;celery 异步执行后,获取 celery 任务的 task_id;最后通过 flask 将 task_id 返回给 Postman
  3. celery 执行脚本成功或失败,都会将执行结果(包含 task_id)发布到 Redis 的指定通道
  4. Redis 客户端提前订阅通道,监听通道的消息(通过 task_id 匹配执行任务和结果)

3.配置文件

  • redis_config.ini
[celery]
# 配置 Celery 的消息代理 URL
broker_url = redis://127.0.0.1:6379/2# 配置 Celery 的结果后端 URL
backend_url = redis://127.0.0.1:6379/3[redis]
# 配置 Redis 的主机地址
host = 127.0.0.1# 配置 Redis 的端口
port = 6379# 配置 Redis 的数据库索引
db = 4# 配置 Redis 的密码,如果没有密码留空
password =[channel]
# 配置消息发布的频道名称
name = schedulerChannel
  • uwsgi.ini【Windows 开发环境不需要,Linux部署时使用】
[uwsgi]
# 使用 HTTP 协议,监听 6000 端口
http = :6000# 设置 uWSGI 的工作目录为 Flask 应用所在目录
chdir = ./# 指定 Flask 应用所在的 Python 文件和变量名
module = flask_app:app# 启用主进程
master = true# 启动的工作进程数,推荐设置为 CPU 核心数的 2-4 倍
processes = 4# 线程数,可以根据需要设置
threads = 2# 指定静态文件的目录,如果有静态文件服务的需求
# static-map = /static=/path/to/static/files# 设置 Python 的自动加载,使得每次修改代码后无需重启服务
py-autoreload = 1# 设置日志文件路径
daemonize = /opt/scheduler-service/logs/uwsgi.log

4.Celery程序

  • celery_app.py
from celery import Celery
from celery.signals import task_success, task_failure
import importlib.machinery
import redis
from dataclasses import dataclass, asdict
import json
import configparser# 读取配置文件
config = configparser.ConfigParser()
config.read('config/redis_config.ini', encoding='utf-8')# 初始化 Celery 应用
# 设置消息代理(Broker)和结果后端(Backend)
app = Celery('tasks',broker=config['celery']['broker_url'],backend=config['celery']['backend_url']
)# 创建 Redis 连接
client = redis.Redis(host=config['redis']['host'],port=config.getint('redis', 'port'),db=config.getint('redis', 'db'),password=config['redis']['password'])# 频道名称
channel_name = config['channel']['name']# 定义消息数据模型
@dataclass
class Message:success: boolmsg: strtaskId: strresult: str# 任务成功完成时触发的事件
@task_success.connect
def task_success_handler(sender=None, result=None, **kwargs):print(f"任务 {sender.request.id} 执行成功.")# 构建消息message = Message(success=True, msg='', taskId=sender.request.id, result=result)# 发布消息到频道msg_json = json.dumps(asdict(message), ensure_ascii=False)client.publish(channel_name, msg_json)# 任务失败时触发的事件
@task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, args=None, kwargs=None, traceback=None, einfo=None, **kw):print(f"任务 {task_id} 执行失败,异常: {exception}")# 构建消息message = Message(success=False, msg=str(exception), taskId=task_id, result=None)# 发布消息到频道msg_json = json.dumps(asdict(message), ensure_ascii=False)client.publish(channel_name, msg_json)# 定义 Celery 任务
@app.task
def execute_script(script_path, *args, **kwargs):try:# 使用 SourceFileLoader 加载模块loader = importlib.machinery.SourceFileLoader("module.name", script_path)module = loader.load_module()# 执行模块中的 main 函数result = module.main(*args, **kwargs)return resultexcept Exception as e:print(f"执行过程异常: {e}", exc_info=True)raise
  • celery_worker_start.py
from celery_app import appif __name__ == '__main__':# 在 Windows 环境下运行 Celery Worker# 使用 solo 模式以避免 Windows 下多进程问题app.worker_main(argv=['worker', '--loglevel=info', '--pool=solo'])# 在 Linux 环境下运行 Celery Worker# 使用 8 个并发工作进程# app.worker_main(argv=['worker', '--loglevel=info', '--concurrency=8'])

5.Flask程序

  • flask_app.py
from flask import Flask, request
from celery_app import execute_script
from dataclasses import dataclass, asdict
import json# 初始化 Flask 应用实例
app = Flask(__name__)# 定义数据模型
@dataclass
class Message:success: boolmsg: strtaskId: str# 定义执行脚本的接口
@app.route('/execute_script', methods=['POST'])
def run_script():# 获取请求数据data = request.get_json()# 异步执行脚本并获取任务 IDtask = execute_script.delay(data['script_path'], *data['args'])task_id = task.idprint(f"flask_app > task_id: {task_id}")try:# 创建成功消息message = Message(success=True, msg='', taskId=task_id)msg_json = json.dumps(asdict(message), ensure_ascii=False)return msg_json, 200except Exception as e:# 捕获并处理异常,返回错误消息message = Message(success=False, msg=str(e), taskId=task_id)msg_json = json.dumps(asdict(message), ensure_ascii=False)return msg_json, 500
  • flask_app_start.py
from flask_app import app# 运行 Flask 应用
if __name__ == '__main__':# 以调试模式启动 Flask 应用# 设置 host 为 '0.0.0.0' 以允许外部访问# 端口号为 6000# 使用 reloader 以便在代码更改时自动重启服务器app.run(debug=True, host='0.0.0.0', port=6000, use_reloader=True)

6.测试脚本

  • test_script.py
import time# main 方法作为入口
def main(x, y):# 阻塞 5 秒time.sleep(5)# 计算结果result = x * y# 打印乘法计算结果print(f"乘法计算: {x} X {y} = {result}")# 返回计算结果return result

7.程序启动

1)Windows开发调试

  • 运行 flask_app_start.pycelery_worker_start.py

2)Linux服务器部署

  • 未安装 uwsgi
nohup python celery_worker_start.py > logs/celery.$(date +%Y-%m-%d).log 2>&1 &
nohup python flask_app_start.py > logs/flask.$(date +%Y-%m-%d).log 2>&1 &
  • 已安装 uwsgi
nohup python celery_worker_start.py > logs/celery.$(date +%Y-%m-%d).log 2>&1 &
uwsgi --ini config/uwsgi.ini

3)订阅通道

  • Redis 客户端订阅通道
redis-cli subscribe schedulerChannel

4)测试

  1. 使用 Springboot 等可以发送请求的程序进行测试,不限编程语言
  2. 使用 Postman for Windows 发送请求进行测试(当前文章使用的方法

Postman配置:

  1. POST 请求:http://192.168.28.179:6000/execute_script
  2. Headers 配置(配置 Key 和 Value ):
  • Content-Type : application/json
  • Accept : application/json
  1. Body - raw 配置(测试脚本和参数):
{"script_path": "E:\\scheduler-service\\test\\test_script.py","args": [1, 5]
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/872007.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

公司周年庆活动应该怎么策划?

当我们谈论公司周年庆典,我们不仅仅是在讨论一个简单的派对。 这是一个展现公司文化、增强员工归属感、加深客户关系,甚至推动公司战略发展的重要时刻。 那么,如何策划一场既有趣又有意义的周年庆典呢?这里分享一点自己的私人笔…

【java】力扣 买卖股票的最佳时机II

文章目录 题目链接题目描述思路代码 题目链接 122.买卖股票的最佳时机II 题目描述 思路 这道题和121.买卖股票的最佳时机 有所不同,不同点在于,这道题的股票可以多次买卖(但是要在买之前先卖掉) 详细思路请看链接的文章【java】力扣 买卖股票的最佳时…

excel及panda的部分内容

文章目录 python操作EXCELPandas 操作 Excel 的 DataFrame总结 python操作EXCEL 在 Python 中,操作 Excel 文件通常使用以下几个库: Pandas:强大的数据分析库,支持读取和写入 Excel 文件。 Openpyxl:专门用于读写 Ex…

ERP基础知识

ERP 一、概述 ​ ERP是Event-related Potentials的简称。外加一种特定的刺激,作用于感觉系统或脑 的某一部位,在给予刺激或撤销刺激时,或和当某种心理因素出现时在脑区所产生的电位变化,成为事件相关电位,是一种特殊…

【busybox记录】【shell指令】chown

目录 内容来源: 【GUN】【chown】指令介绍 【busybox】【chown】指令介绍 【linux】【chown】指令介绍 使用示例: 常用组合指令: 指令不常用/组合用法还需继续挖掘: 内容来源: GUN : Coreutils - GN…

Sentinel-1 Level 1数据处理的详细算法定义(四)

《Sentinel-1 Level 1数据处理的详细算法定义》文档定义和描述了Sentinel-1实现的Level 1处理算法和方程,以便生成Level 1产品。这些算法适用于Sentinel-1的Stripmap、Interferometric Wide-swath (IW)、Extra-wide-swath (EW)和Wave模式。 今天介绍的内容如下: Sentinel-1 L…

【详解】Spring Cloud概述

🎥 个人主页:Dikz12🔥个人专栏:Spring学习之路📕格言:吾愚多不敏,而愿加学欢迎大家👍点赞✍评论⭐收藏 目录 1. 认识微服务 1.1 单体架构 1.2 集群和分布式架构 1.3 集群和分布式…

从零开始做题:什么奇奇怪怪的东西

题目 解题 mrf拓展名,macro recorder打开,鼠标键盘的记录 然后解压flag.zip即可,发现有一个挂载的文件,直接打开后 显示所有的隐藏文件 一个一个打开 然后进行拼接运行吧估计。 首先打开txt文件直接久就给出了代码&#xff1…

linux的学习(四):磁盘,进程,定时,软件包的相关命令

简介 关于磁盘管理,进程管理,定时任务,软件包管理的命令的使用 磁盘管理类命令 du du 目录名: 查看文件和目录占用的磁盘空间 参数: -h:可以看到大小的单位,g,mb-a:还可以看到文…

昇思25天学习打卡营第8天|模型权重保存与加载

打卡 目录 打卡 模型的两种保存形式 Checkpoint 中间表示IR 模型保存与加载 模型权重保存-例1 模型权重加载-例1 模型权重保存-例2 模型权重加载-例2 模型权重文件的空间占用计算-例 模型的两种保存形式 Checkpoint 权重参数文件 中间表示IR 中间表示(…

GitHub每日最火火火项目(7.15)

项目名称:public - apis / public - apis 项目介绍:这是一个集合了各种免费 API 的项目。在当今的软件开发中,API(应用程序编程接口)起着至关重要的作用,它允许不同的应用程序之间进行交互和数据共享。该项…

骨头的诱惑

在看之前,先把他关注了,谢谢iwowo 的个人中心 - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 代码思路主要来自他(说实话,代码我也会)但是,思路和代码还是不一样的,所以 快去关注啊&#xff…

跟着操作,解决iPhone怎么清理内存难题

在如今智能手机功能日益强大的时代,我们使用手机拍照、录制视频、下载应用、存储文件等操作都会占用手机内存。当内存空间不足时,手机运行会变得缓慢,甚至出现卡顿、闪退等现象。因此,定期清理iPhone内存是非常必要的。那么&#…

信息检索(112):Token-level Adaptive Training for Neural Machine Translation

Token-level Adaptive Training for Neural Machine Translation 摘要1 引言2 背景3 方法3.1 现有的自适应目标调查3.2 token 权重的启发式标准3.3 两个具体的自适应目标 4 实验4.1 数据准备4.2 系统4.3 超参数4.4 实验结果 5 分析5.1 考虑标记频率对翻译质量的影响5.2 不同 BP…

详解注意力机制上篇【RNN,Seq2Seq(Encoder-Decoder,编码器-解码器)等基础知识】

NLP-大语言模型学习系列目录 一、注意力机制基础——RNN,Seq2Seq等基础知识 二、注意力机制【Self-Attention,自注意力模型】 🔥 在自然语言处理(NLP)领域,理解和生成自然语言的能力对于构建智能系统至关重要。从文本分类、机器翻…

电脑文件误删除如何恢复?Top12电脑数据恢复软件汇总合集!(图文详解)

电脑文件误删除如何恢复?在日常使用电脑过程中,我们经常会遇到意外删除文件的情况。可能是因为按错了按键、误操作了鼠标,或者意外格式化了存储设备。这些情况都可能导致重要的文件不小心被删除。但是不用担心,有许多专业的数据恢…

昇思25天学习打卡营第10天|NLP-RNN实现情感分类

打卡 目录 打卡 任务说明 流程 数据准备与加载 加载预训练词向量(分词) 数据集预处理 模型构建 Embedding RNN(循环神经网络) LSTM 全连接层 损失函数与优化器 训练逻辑 评估指标和逻辑 模型训练与保存 模型加载与测试 自定义输入测试 …

高频面试题基本总结回顾4(含笔试高频算法整理)

目录 一、基本面试流程回顾 二、基本高频算法题展示 三、基本面试题总结回顾 (一)Java高频面试题整理 (二)JVM相关面试问题整理 (三)MySQL相关面试问题整理 (四)Redis相关面试…

Python从0到100(三十九):数据提取之正则(文末免费送书)

前言: 零基础学Python:Python从0到100最新最全教程。 想做这件事情很久了,这次我更新了自己所写过的所有博客,汇集成了Python从0到100,共一百节课,帮助大家一个月时间里从零基础到学习Python基础语法、Pyth…

Qt|QTreewidget类下函数qt助手详解说明示例(二)

上篇:Qt|QTreewidget类下函数qt助手详解说明示例(一) 该系列持续更新,喜欢请一键三连,一起学习进步,升职加薪,感谢各位大佬。 QT5.14.2 参考官方QT助手 Kimi辅助说明 文章目录 insertTopLevelI…