基于celery的任务管理,本文主要是处理自己的算法,暴露API,管理任务并发,多线程

基于celery的任务管理,本文主要是处理自己的算法,暴露API,管理任务并发,多线程

  • 基本需求描述
  • 潜在问题
  • 主函数
  • 配置文件

基本需求描述

  • 暴露API,供其他人调用算法。
  • 方便查看任务状态。
  • 因为服务器资源有限,控制并发数量。
  • 多任务并发加快处理速度。这里需要说明的是python本身是可以做多线程的,但是(1)直接使用threading,GIL的存在导致并不是多线程处理,实际上并发还是一个CPU核在处理;(2)可以使用multiprocessing来实现多线程,但是这里有个问题就是自己如果按照我之前的博客来通过数组来实现一个任务队列的话,必然会涉及到全局变量问题,这里全局变量是可以实现的,但是不够优雅,因为有很多现成的库具备更加强大的功能,所以如果不是离线环境等受多种客观环境限制,建议还是使用现成的即可,本文这里所以使用了celery+redis的方式来管理任务队列。
  • 管理任务队列,某个任务失败后自动再次尝试,如果超过尝试次数阈值则该任务将认为失败,不会再尝试处理。

潜在问题

  • 因为是针对我自己的需求,CPU每个核处理一个任务。这意味着如果你想几个任务多个CPU Core来处理的话,可以考虑算法中multiprocessing或者其他更优雅的方式,只是我在本项目中不需要考虑这个问题。

主函数

import time
import json
import redis
import logging
from flask import Flask, request, jsonify
from flask_cors import CORS
from celery import Celery
from config import CELERY_BROKER_URL, CELERY_RESULT_BACKEND, REDIS_HOST, REDIS_PORT, REDIS_DB, OUTPUT_DIR, CELERY_WORKER_CONCURRENCY, CELERY_TASK_MAX_RETRIES, CELERY_TASK_RETRY_DELAY# your algorithms
from simulator import rainfall_simulation_gpu, rainfall_simulation_cpu# Flask
app = Flask(__name__)
CORS(app)app.config.update(CELERY_BROKER_URL=CELERY_BROKER_URL,CELERY_RESULT_BACKEND=CELERY_RESULT_BACKEND,JSON_AS_ASCII=False,
)celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(result_backend=app.config['CELERY_RESULT_BACKEND'], worker_concurrency=CELERY_WORKER_CONCURRENCY)redis_client = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)@celery.task(bind=True, max_retries=CELERY_TASK_MAX_RETRIES, default_retry_delay=CELERY_TASK_RETRY_DELAY)
def long_running_task(self, task_info):unique_key = f"task:{hash(str(task_info))}"try:if not redis_client.set(unique_key, self.request.id, nx=True, ex=3600):return {'status': 'Duplicate task, skipping execution'}logger.info(f"Processing task: {task_info}")if task_info.get('GPU') == 1:simulation_status, save_path = rainfall_simulation_gpu(task_info, ngpus=1, output_dir=OUTPUT_DIR)else:simulation_status, save_path = rainfall_simulation_cpu(task_info, output_dir=OUTPUT_DIR)if simulation_status:return {'status': 'Task completed!', 'save_path': save_path}raise Exception('Simulation failed')except Exception as exc:logger.error(f"Error in task: {exc}")if self.request.retries >= self.max_retries:redis_client.delete(unique_key)raise self.update_state(state='FAILURE', meta={'exc': str(exc)})raise self.retry(exc=exc)@app.route('/new_task_rainfall_simulation/', methods=['POST'])
def new_task_rainfall_simulation():task_info = json.loads(request.data.decode('utf-8'))task = long_running_task.apply_async(args=[task_info])return jsonify({'task_id': task.id}), 202@app.route('/task-status/<task_id>', methods=['GET'])
def task_status(task_id):task = long_running_task.AsyncResult(task_id)response = {'state': task.state, 'result': task.info if task.state in ['SUCCESS', 'FAILURE'] else None}return jsonify(response)if __name__ == '__main__':app.run(host='your ip', port=5000) // set your server IP and port

配置文件

config.py

# Redis configuration
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0# Celery configuration
CELERY_BROKER_URL = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'
CELERY_RESULT_BACKEND = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'# General application settings
JSON_AS_ASCII = False# Simulation configuration
OUTPUT_DIR = 'xxx' //Save simulation results# Celery worker configuration
CELERY_WORKER_CONCURRENCY = 6 // concurrent users, depends on your hardware and algorithms
CELERY_TASK_MAX_RETRIES = 2 // define retry times for each task
CELERY_TASK_RETRY_DELAY = 10 // retry the task after n seconds

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/66027.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java(7)常用的工具类

1.Collections集合工具类 内置了大量对集合操作的静态方法&#xff0c;可以通过类名直接调用方法。 方法的种类&#xff1a;最大值max、最小值min、sort排序...详见API帮助文档 import java.util.ArrayList; import java.util.Collections; import java.util.List;public cl…

【Varnish】:解决 Varnish 7.6 CDN 静态资源缓存失效问题

项目场景&#xff1a; 在一个使用Varnish作为反向代理的Web应用中&#xff0c;我们依赖CDN&#xff08;内容分发网络&#xff09;来缓存静态资源&#xff08;如图片、CSS、JavaScript文件等&#xff09;&#xff0c;以提高全球用户的访问速度并减轻源站服务器的负载。然而&…

理解机器学习中的参数和超参数

在机器学习中&#xff0c;参数和超参数是两个重要但不同的概念&#xff0c;它们共同影响模型的性能和表现。以下是它们的定义和区别&#xff0c;以及如何通俗地理解它们&#xff1a; 1. 参数 定义 参数是模型在训练过程中自动学习到的变量&#xff0c;它们直接决定了模型如何…

Win11右键菜单实现

主要参考Win11 Context Menu Demo 此工程是vs2022编译&#xff0c;vs2019先修改下 base.h 方可编译过 编译好dll以后 拷贝至SparsePackage目录下 生成稀疏包msix 就拿他工程里面的改&#xff0c;编辑AppxManifest.xml&#xff0c;配置都要对&#xff0c;一个不对可能都失败&a…

R.swift库的详细用法

R.swift 是一个 Swift 工具库,它提供了一个自动生成的类 R,使得你可以通过类型安全的方式访问项目中的资源,例如图片、字体、颜色、XIB 文件等。通过 R.swift,你可以避免字符串类型的错误,提升代码的可维护性。 以下是 R.swift 库的详细用法: 1. 安装 R.swift 使用 Sw…

像JSONDecodeError: Extra data: line 2 column 1 (char 134)这样的问题怎么解决

问题介绍 今天处理返回的 JSON 的时候&#xff0c;出现了下面这样的问题&#xff1a; 处理这种问题的时候&#xff0c;首先你要看一下当前的字符串格式是啥样的&#xff0c;比如我查看后发现是下面这样的&#xff1a; 会发现这个字符串中间没有逗号&#xff0c;也就是此时的J…

what?ngify 比 axios 更好用,更强大?

文章目录 前言一、什么是ngify&#xff1f;二、npm安装三、发起请求3.1 获取 JSON 数据3.2 获取其他类型的数据3.3 改变服务器状态3.4 设置 URL 参数3.5 设置请求标头3.6 与服务器响应事件交互3.7 接收原始进度事件3.8 处理请求失败3.9 Http Observables 四、更换 HTTP 请求实现…

Linux Kernel 之十 详解 PREEMPT_RT、Xenomai 的架构、源码、构建及使用

概述 现在的 RTOS 基本可以分为 Linux 阵营和非 Linux 阵营这两大阵营。非 Linux 阵营的各大 RTOS 都是独立发展,使用上也相对独立;而 Linux 阵营则有多种不同的实现方法来改造 Linux 以实现实时性要求。本文我们重点关注 Linux 阵营的实时内核实现方法! 本文我们重点关注 …

【拒绝算法PUA】3065. 超过阈值的最少操作数 I

系列文章目录 【拒绝算法PUA】0x00-位运算 【拒绝算法PUA】0x01- 区间比较技巧 【拒绝算法PUA】0x02- 区间合并技巧 【拒绝算法PUA】0x03 - LeetCode 排序类型刷题 【拒绝算法PUA】LeetCode每日一题系列刷题汇总-2025年持续刷新中 C刷题技巧总结&#xff1a; [温习C/C]0x04 刷…

ClickHouse-CPU、内存参数设置

常见配置 1. CPU资源 1、clickhouse服务端的配置在config.xml文件中 config.xml文件是服务端的配置&#xff0c;在config.xml文件中指向users.xml文件&#xff0c;相关的配置信息实际是在users.xml文件中的。大部分的配置信息在users.xml文件中&#xff0c;如果在users.xml文…

《自动驾驶与机器人中的SLAM技术》ch9:自动驾驶车辆的离线地图构建

目录 1 点云建图的流程 2 前端实现 2.1 前端流程 2.2 前端结果 3 后端位姿图优化与异常值剔除 3.1 两阶段优化流程 3.2 优化结果 ① 第一阶段优化结果 ② 第二阶段优化结果 4 回环检测 4.1 回环检测流程 ① 遍历第一阶段优化轨迹中的关键帧。 ② 并发计算候选回环对…

type 属性的用途和实现方式(图标,表单,数据可视化,自定义组件)

1.图标类型 <uni-icon>组件中&#xff0c;type可以用来指定图标的不同样式。 <uni-icons type"circle" size"30" color"#007aff"></uni-icons> //表示圆形 <uni-icons type"square" size"30" co…

网络基础知识指南|1-20个

1. IP地址: 即互联网协议地址&#xff0c;是用于标识互联网上的每一个设备或节点的唯一地址。IP地址的作用主要是进行网络设备的定位和路由&#xff0c;确保数据包可以从源设备准确地传送到目标设备。2. 子网掩码: 是用于将一个IP地址划分为网络地址和主机地址的工具。它通常与…

GPT 系列论文精读:从 GPT-1 到 GPT-4

学习 & 参考资料 前置文章 Transformer 论文精读 机器学习 —— 李宏毅老师的 B 站搬运视频 自监督式学习(四) - GPT的野望[DLHLP 2020] 來自猎人暗黑大陆的模型 GPT-3 论文逐段精读 —— 沐神的论文精读合集 GPT&#xff0c;GPT-2&#xff0c;GPT-3 论文精读【论文精读】…

lombok在高版本idea中注解不生效的解决

环境&#xff1a; IntelliJ IDEA 2024.3.1.1 Spring Boot Maven 问题描述 使用AllArgsConstructor注解一个用户类&#xff0c;然后调用全参构造方法创建对象&#xff0c;出现错误&#xff1a; java: 无法将类 com.itheima.pojo.User中的构造器 User应用到给定类型; 需要:…

145.《redis原生超详细使用》

文章目录 什么是redisredis 安装启动redis数据类型redis key操作key 的增key 的查key 的改key 的删key 是否存在key 查看所有key 「设置」过期时间key 「查看」过期时间key 「移除」过期时间key 「查看」数据类型key 「匹配」符合条件的keykey 「移动」到其他数据库 redis数据类…

大数据技术Kafka详解 ⑤ | Kafka中的CAP机制

目录 1、分布式系统当中的CAP理论 1.1、CAP理论 1.2、Partitiontolerance 1.3、Consistency 1.4、Availability 2、Kafka中的CAP机制 C软件异常排查从入门到精通系列教程&#xff08;核心精品专栏&#xff0c;订阅量已达600多个&#xff0c;欢迎订阅&#xff0c;持续更新…

riscv架构下linux4.15实现early打印

在高版本linux6.12.7源码中&#xff0c;early console介绍&#xff0c;可参考《riscv架构下linux6.12.7实现early打印》文章。 1 什么是early打印 适配内核到新的平台&#xff0c;基本环境搭建好之后&#xff0c;首要的就是要调通串口&#xff0c;方便后面的信息打印。 正常流…

improve-gantt-elastic(vue2中甘特图实现与引入)

1.前言 项目开发中需要使用甘特图展示项目实施进度&#xff0c;左侧为表格计划&#xff0c;右侧为图表进度展示。wl-gantt-mater&#xff0c;dhtmlx尝试使用过可拓展性受到限制。gantt-elastic相对简单&#xff0c;可操作性强&#xff0c;基础版本免费。 甘特图&#xff08;Gan…

力扣 全排列

回溯经典例题。 题目 通过回溯生成所有可能的排列。每次递归时&#xff0c;选择一个数字&#xff0c;直到选满所有数字&#xff0c;然后记录当前排列&#xff0c;回到上层时移除最后选的数字并继续选择其他未选的数字。每次递归时&#xff0c;在 path 中添加一个新的数字&…