基于celery的任务管理,本文主要是处理自己的算法,暴露API,管理任务并发,多线程

基于celery的任务管理,本文主要是处理自己的算法,暴露API,管理任务并发,多线程

  • 基本需求描述
  • 潜在问题
  • 主函数
  • 配置文件

基本需求描述

  • 暴露API,供其他人调用算法。
  • 方便查看任务状态。
  • 因为服务器资源有限,控制并发数量。
  • 多任务并发加快处理速度。这里需要说明的是python本身是可以做多线程的,但是(1)直接使用threading,GIL的存在导致并不是多线程处理,实际上并发还是一个CPU核在处理;(2)可以使用multiprocessing来实现多线程,但是这里有个问题就是自己如果按照我之前的博客来通过数组来实现一个任务队列的话,必然会涉及到全局变量问题,这里全局变量是可以实现的,但是不够优雅,因为有很多现成的库具备更加强大的功能,所以如果不是离线环境等受多种客观环境限制,建议还是使用现成的即可,本文这里所以使用了celery+redis的方式来管理任务队列。
  • 管理任务队列,某个任务失败后自动再次尝试,如果超过尝试次数阈值则该任务将认为失败,不会再尝试处理。

潜在问题

  • 因为是针对我自己的需求,CPU每个核处理一个任务。这意味着如果你想几个任务多个CPU Core来处理的话,可以考虑算法中multiprocessing或者其他更优雅的方式,只是我在本项目中不需要考虑这个问题。

主函数

import time
import json
import redis
import logging
from flask import Flask, request, jsonify
from flask_cors import CORS
from celery import Celery
from config import CELERY_BROKER_URL, CELERY_RESULT_BACKEND, REDIS_HOST, REDIS_PORT, REDIS_DB, OUTPUT_DIR, CELERY_WORKER_CONCURRENCY, CELERY_TASK_MAX_RETRIES, CELERY_TASK_RETRY_DELAY# your algorithms
from simulator import rainfall_simulation_gpu, rainfall_simulation_cpu# Flask
app = Flask(__name__)
CORS(app)app.config.update(CELERY_BROKER_URL=CELERY_BROKER_URL,CELERY_RESULT_BACKEND=CELERY_RESULT_BACKEND,JSON_AS_ASCII=False,
)celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(result_backend=app.config['CELERY_RESULT_BACKEND'], worker_concurrency=CELERY_WORKER_CONCURRENCY)redis_client = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)@celery.task(bind=True, max_retries=CELERY_TASK_MAX_RETRIES, default_retry_delay=CELERY_TASK_RETRY_DELAY)
def long_running_task(self, task_info):unique_key = f"task:{hash(str(task_info))}"try:if not redis_client.set(unique_key, self.request.id, nx=True, ex=3600):return {'status': 'Duplicate task, skipping execution'}logger.info(f"Processing task: {task_info}")if task_info.get('GPU') == 1:simulation_status, save_path = rainfall_simulation_gpu(task_info, ngpus=1, output_dir=OUTPUT_DIR)else:simulation_status, save_path = rainfall_simulation_cpu(task_info, output_dir=OUTPUT_DIR)if simulation_status:return {'status': 'Task completed!', 'save_path': save_path}raise Exception('Simulation failed')except Exception as exc:logger.error(f"Error in task: {exc}")if self.request.retries >= self.max_retries:redis_client.delete(unique_key)raise self.update_state(state='FAILURE', meta={'exc': str(exc)})raise self.retry(exc=exc)@app.route('/new_task_rainfall_simulation/', methods=['POST'])
def new_task_rainfall_simulation():task_info = json.loads(request.data.decode('utf-8'))task = long_running_task.apply_async(args=[task_info])return jsonify({'task_id': task.id}), 202@app.route('/task-status/<task_id>', methods=['GET'])
def task_status(task_id):task = long_running_task.AsyncResult(task_id)response = {'state': task.state, 'result': task.info if task.state in ['SUCCESS', 'FAILURE'] else None}return jsonify(response)if __name__ == '__main__':app.run(host='your ip', port=5000) // set your server IP and port

配置文件

config.py

# Redis configuration
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0# Celery configuration
CELERY_BROKER_URL = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'
CELERY_RESULT_BACKEND = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'# General application settings
JSON_AS_ASCII = False# Simulation configuration
OUTPUT_DIR = 'xxx' //Save simulation results# Celery worker configuration
CELERY_WORKER_CONCURRENCY = 6 // concurrent users, depends on your hardware and algorithms
CELERY_TASK_MAX_RETRIES = 2 // define retry times for each task
CELERY_TASK_RETRY_DELAY = 10 // retry the task after n seconds

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/66027.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Win11右键菜单实现

主要参考Win11 Context Menu Demo 此工程是vs2022编译&#xff0c;vs2019先修改下 base.h 方可编译过 编译好dll以后 拷贝至SparsePackage目录下 生成稀疏包msix 就拿他工程里面的改&#xff0c;编辑AppxManifest.xml&#xff0c;配置都要对&#xff0c;一个不对可能都失败&a…

像JSONDecodeError: Extra data: line 2 column 1 (char 134)这样的问题怎么解决

问题介绍 今天处理返回的 JSON 的时候&#xff0c;出现了下面这样的问题&#xff1a; 处理这种问题的时候&#xff0c;首先你要看一下当前的字符串格式是啥样的&#xff0c;比如我查看后发现是下面这样的&#xff1a; 会发现这个字符串中间没有逗号&#xff0c;也就是此时的J…

what?ngify 比 axios 更好用,更强大?

文章目录 前言一、什么是ngify&#xff1f;二、npm安装三、发起请求3.1 获取 JSON 数据3.2 获取其他类型的数据3.3 改变服务器状态3.4 设置 URL 参数3.5 设置请求标头3.6 与服务器响应事件交互3.7 接收原始进度事件3.8 处理请求失败3.9 Http Observables 四、更换 HTTP 请求实现…

Linux Kernel 之十 详解 PREEMPT_RT、Xenomai 的架构、源码、构建及使用

概述 现在的 RTOS 基本可以分为 Linux 阵营和非 Linux 阵营这两大阵营。非 Linux 阵营的各大 RTOS 都是独立发展,使用上也相对独立;而 Linux 阵营则有多种不同的实现方法来改造 Linux 以实现实时性要求。本文我们重点关注 Linux 阵营的实时内核实现方法! 本文我们重点关注 …

【拒绝算法PUA】3065. 超过阈值的最少操作数 I

系列文章目录 【拒绝算法PUA】0x00-位运算 【拒绝算法PUA】0x01- 区间比较技巧 【拒绝算法PUA】0x02- 区间合并技巧 【拒绝算法PUA】0x03 - LeetCode 排序类型刷题 【拒绝算法PUA】LeetCode每日一题系列刷题汇总-2025年持续刷新中 C刷题技巧总结&#xff1a; [温习C/C]0x04 刷…

ClickHouse-CPU、内存参数设置

常见配置 1. CPU资源 1、clickhouse服务端的配置在config.xml文件中 config.xml文件是服务端的配置&#xff0c;在config.xml文件中指向users.xml文件&#xff0c;相关的配置信息实际是在users.xml文件中的。大部分的配置信息在users.xml文件中&#xff0c;如果在users.xml文…

《自动驾驶与机器人中的SLAM技术》ch9:自动驾驶车辆的离线地图构建

目录 1 点云建图的流程 2 前端实现 2.1 前端流程 2.2 前端结果 3 后端位姿图优化与异常值剔除 3.1 两阶段优化流程 3.2 优化结果 ① 第一阶段优化结果 ② 第二阶段优化结果 4 回环检测 4.1 回环检测流程 ① 遍历第一阶段优化轨迹中的关键帧。 ② 并发计算候选回环对…

GPT 系列论文精读:从 GPT-1 到 GPT-4

学习 & 参考资料 前置文章 Transformer 论文精读 机器学习 —— 李宏毅老师的 B 站搬运视频 自监督式学习(四) - GPT的野望[DLHLP 2020] 來自猎人暗黑大陆的模型 GPT-3 论文逐段精读 —— 沐神的论文精读合集 GPT&#xff0c;GPT-2&#xff0c;GPT-3 论文精读【论文精读】…

大数据技术Kafka详解 ⑤ | Kafka中的CAP机制

目录 1、分布式系统当中的CAP理论 1.1、CAP理论 1.2、Partitiontolerance 1.3、Consistency 1.4、Availability 2、Kafka中的CAP机制 C软件异常排查从入门到精通系列教程&#xff08;核心精品专栏&#xff0c;订阅量已达600多个&#xff0c;欢迎订阅&#xff0c;持续更新…

riscv架构下linux4.15实现early打印

在高版本linux6.12.7源码中&#xff0c;early console介绍&#xff0c;可参考《riscv架构下linux6.12.7实现early打印》文章。 1 什么是early打印 适配内核到新的平台&#xff0c;基本环境搭建好之后&#xff0c;首要的就是要调通串口&#xff0c;方便后面的信息打印。 正常流…

improve-gantt-elastic(vue2中甘特图实现与引入)

1.前言 项目开发中需要使用甘特图展示项目实施进度&#xff0c;左侧为表格计划&#xff0c;右侧为图表进度展示。wl-gantt-mater&#xff0c;dhtmlx尝试使用过可拓展性受到限制。gantt-elastic相对简单&#xff0c;可操作性强&#xff0c;基础版本免费。 甘特图&#xff08;Gan…

力扣 全排列

回溯经典例题。 题目 通过回溯生成所有可能的排列。每次递归时&#xff0c;选择一个数字&#xff0c;直到选满所有数字&#xff0c;然后记录当前排列&#xff0c;回到上层时移除最后选的数字并继续选择其他未选的数字。每次递归时&#xff0c;在 path 中添加一个新的数字&…

1/13+2

运算符重载 myString.h #ifndef MYSTRING_H #define MYSTRING_H #include <cstring> #include <iostream> using namespace std; class myString {private:char *str; //记录c风格的字符串int size; //记录字符串的实际长度int capacity; …

【HM-React】08. Layout模块

基本结构和样式reset 结构创建 实现步骤 打开 antd/Layout 布局组件文档&#xff0c;找到示例&#xff1a;顶部-侧边布局-通栏拷贝示例代码到我们的 Layout 页面中分析并调整页面布局 代码实现 pages/Layout/index.js import { Layout, Menu, Popconfirm } from antd impor…

计算机视觉算法实战——实时车辆检测和分类(主页有相关源码)

✨个人主页欢迎您的访问 ✨期待您的三连 ✨ ✨个人主页欢迎您的访问 ✨期待您的三连 ✨ ✨个人主页欢迎您的访问 ✨期待您的三连✨ ​ ​​​​​​​​​​​​​​​​​​ 1. 领域介绍✨✨ 实时车辆检测和分类是计算机视觉中的一个重要应用领域&#xff0c;旨在从视频流或…

使用 selenium-webdriver 开发 Web 自动 UI 测试程序

优缺点 优点 有时候有可能一个改动导致其他的地方的功能失去效果&#xff0c;这样使用 Web 自动 UI 测试程序可以快速的检查并定位问题&#xff0c;节省大量的人工验证时间 缺点 增加了维护成本&#xff0c;如果功能更新过快或者技术更新过快&#xff0c;维护成本也会随之提高…

性能测试工具Jmeter分布式运行

性能测试工具JMeter的分布式执行是一种用于增强压力测试能力的技术方案&#xff0c;它允许用户通过多台机器来共同完成同一个测试计划的执行。这种方式特别适用于需要模拟成百上千甚至上万用户并发访问的情况&#xff0c;当单台机器由于硬件资源&#xff08;如CPU、内存、网络I…

弥散张量分析开源软件 DSI Studio 简体中文汉化版可以下载了

网址&#xff1a; (63条消息) DSIStudio简体中文汉化版(2022年7月)-算法与数据结构文档类资源-CSDN文库

移动云自研云原生数据库入围国采!

近日&#xff0c;中央国家机关2024年度事务型数据库软件框架协议联合征集采购项目产品名单正式公布&#xff0c;移动云自主研发的云原生数据库产品顺利入围。这一成就不仅彰显了移动云在数据库领域深耕多年造就的领先技术优势&#xff0c;更标志着国家权威评审机构对移动云在数…

在vscode中使用R-1

参考我的上一篇博客&#xff1a; https://blog.csdn.net/weixin_62528784/article/details/145092632?spm1001.2014.3001.5501 这篇内容实际上就是上一篇博客的后续承接&#xff0c;既然都在vscode的jupyter中使用R了&#xff0c;实际上其实也能够直接在vscode中原生使用R的编…