构建高性能异步任务引擎:FastAPI + Celery + Redis

在现代应用开发中,异步任务处理是一个常见的需求。无论是数据处理、图像生成,还是复杂的计算任务,异步执行都能显著提升系统的响应速度和吞吐量。今天,我们将通过一个实际项目,探索如何使用 FastAPICeleryRedis 构建一个高性能的异步任务引擎。

项目背景

技术栈介绍

  • FastAPI:一个现代、高性能的 Web 框架,基于 Python 3.7+ 的异步编程特性构建。它支持自动生成 OpenAPI 文档和 Swagger UI,能够快速构建 RESTful API,并且具有极低的延迟和高并发处理能力。
  • Celery:一个分布式任务队列系统,主要用于处理异步任务和定时任务。它支持多种消息传输机制,能够将任务分发到多个工作节点上并行处理,从而提高系统的吞吐量和响应速度。
  • Redis:一个高性能的键值存储系统,常用于缓存、消息队列和分布式锁等场景。在 Celery 中,Redis 通常作为消息代理(Broker)和结果存储(Backend),负责任务的分发和结果的持久化。

项目目标

通过 FastAPI、Celery 和 Redis 的结合,构建一个能够高效处理用户提交的 Python 代码的异步任务引擎。用户可以通过 API 提交代码,系统会异步执行代码,并返回任务的执行结果。

项目目录结构

project/
├── main.py
├── utils.py
├── schemas.py
└── app/├── __init__.py├── config.py└── tasks/├── __init__.py└── tasks.py

代码功能深度解析

1. main.py:FastAPI 应用的核心

main.py 是项目的核心入口文件,负责定义 FastAPI 应用的接口和逻辑。

FastAPI 应用初始化
app = FastAPI(title="Async Task API", description="", version="1.0.0")

这里我们创建了一个 FastAPI 应用,命名为 Async Task API,版本为 1.0.0

自定义 Swagger UI
def swagger_monkey_patch(*args, **kwargs):return get_swagger_ui_html(*args,**kwargs,swagger_js_url="https://cdn.bootcdn.net/ajax/libs/swagger-ui/5.6.2/swagger-ui-bundle.js",swagger_css_url="https://cdn.bootcdn.net/ajax/libs/swagger-ui/5.6.2/swagger-ui.min.css",)
applications.get_swagger_ui_html = swagger_monkey_patch

通过 Monkey Patch 的方式,我们自定义了 Swagger UI 的资源加载路径,使用了国内的 CDN 加速资源,提升文档加载速度。

全局异常处理
@app.exception_handler(Exception)
def validation_exception_handler(request, err):base_error_message = f"Failed to execute: {request.method}: {request.url}"return JSONResponse(status_code=400, content={"message": f"{base_error_message}. Detail: {err}"})

我们定义了一个全局异常处理器,捕获所有未处理的异常,并返回一个包含错误信息的 JSON 响应。

HTTP 中间件:计算请求处理时间
@app.middleware("http")
async def add_process_time_header(request, call_next):start_time = time.time()response = await call_next(request)process_time = time.time() - start_timeresponse.headers["X-Process-Time"] = str(f'{process_time:0.4f} sec')return response

这个中间件用于计算每个请求的处理时间,并将处理时间添加到响应头 X-Process-Time 中,方便调试和性能优化。

创建任务的 API
@app.post('/tasks')
def create_pytask(task: schemas.PyTask):code = task.codetime_limit = task.time_limitexpires = task.expiresresult = execute_python_code.apply_async(args=(code,), time_limit=time_limit, expires=expires)return JSONResponse(content={"task_id": result.id})

用户可以通过 /tasks 接口提交 Python 代码,代码会被异步执行。任务的执行结果可以通过 /tasks/{task_id} 接口查询。

查询任务结果的 API
@app.get('/tasks/{task_id}', response_model=schemas.PyTaskResult)
def get_task_result(task_id: str):return get_task_info(task_id)

用户可以通过 /tasks/{task_id} 接口查询任务的执行结果和状态。

2. utils.py:任务信息获取工具

utils.py 文件定义了一个工具函数 get_task_info,用于获取 Celery 任务的状态和结果。

def get_task_info(task_id):task_result = AsyncResult(task_id, app=app)result = {"task_id": task_id,"task_status": task_result.status,"task_result": task_result.result}return result

通过 AsyncResult,我们可以获取任务的当前状态(如 PENDINGSUCCESSFAILURE 等)和执行结果。

3. schemas.py:数据模型定义

schemas.py 文件定义了 Pydantic 模型,用于验证和序列化请求和响应的数据。

任务请求模型
class PyTask(BaseModel):code: strexpires: Optional[int] = Nonetime_limit: Optional[int] = None

用户提交的任务请求包含以下字段:

  • code: 任务的 Python 代码。
  • expires: 任务的过期时间(可选)。
  • time_limit: 任务的时间限制(可选)。
任务结果模型
class PyProgramResult(BaseModel):status: stroutput: Optional[str] = Noneerror: Optional[str] = None

任务的执行结果包含以下字段:

  • status: 任务的执行状态(如 successfailure)。
  • output: 任务的标准输出(可选)。
  • error: 任务的错误输出(可选)。
任务结果响应模型
class PyTaskResult(BaseModel):task_id: strtask_status: strtask_result: Optional[PyProgramResult] = None

任务的查询结果包含以下字段:

  • task_id: 任务的 ID。
  • task_status: 任务的状态(如 PENDINGSUCCESS 等)。
  • task_result: 任务的执行结果(可选)。

4. app/__init__.py:Celery 应用初始化

app/__init__.py 文件是 Celery 应用的初始化文件,主要用于配置 Celery 应用和任务的自动发现。

创建 Celery 应用
app = Celery('my_celery_project')

我们创建了一个名为 my_celery_project 的 Celery 应用。

加载配置
app.config_from_object('app.config')

app.config 文件中加载 Celery 的配置。

自动发现任务
app.autodiscover_tasks(['app.tasks'])

自动发现 app.tasks 模块中的任务。

Worker 和 Beat 初始化
@worker_init.connect
def worker_initialization(**kwargs):print("Worker 初始化开始")@beat_init.connect
def beat_initialization(**kwargs):print("Beat 初始化开始")

定义了 Worker 和 Beat 的初始化函数,分别在 Worker 和 Beat 启动时执行。

5. app/config.py:Celery 配置

app/config.py 文件定义了 Celery 的配置。

消息代理和结果存储
broker_url = 'redis://:redisisthebest@redis:6379/0'
result_backend = 'redis://:redisisthebest@redis:6379/0'

使用 Redis 作为消息代理和结果存储。

任务结果过期时间
result_expires = 3600

任务结果在 Redis 中保存 1 小时后过期。

序列化配置
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']

使用 JSON 作为任务和结果的序列化格式。

时区配置
timezone = 'Asia/Shanghai'
enable_utc = True

设置时区为 Asia/Shanghai,并启用 UTC 时间。

6. app/tasks/tasks.py:任务执行逻辑

app/tasks/tasks.py 文件定义了一个 Celery 任务 execute_python_code,用于执行用户提交的 Python 代码。

@app.task
def execute_python_code(code_string):temp_file = "temp_code.py"with open(temp_file, "w") as f:f.write(code_string)try:result = subprocess.run(["python3", temp_file],stdout=subprocess.PIPE,stderr=subprocess.PIPE,text=True)if result.stderr:return {"status": "failure", "error": result.stderr}else:return {"status": "success", "output": result.stdout}finally:if os.path.exists(temp_file):os.remove(temp_file)

该任务将用户提交的代码字符串保存为临时文件,然后使用 subprocess.run 执行该文件,捕获标准输出和错误输出。如果执行成功,返回 success 状态和标准输出;如果执行失败,返回 failure 状态和错误输出。最后,删除临时文件。

部署分析

version: '3.8'services:fastapi:image: lab:python-packagescontainer_name: fastapiports:- 8080:8080volumes:- D:\dockerMount\code\celery:/home/codeworking_dir: /home/codecommand: python3 main.pyrestart: unless-stoppednetworks:- mynetcelery-worker:image: lab:python-packagescontainer_name: celery-workervolumes:- D:\dockerMount\code\celery:/home/celeryworking_dir: /home/celerycommand: celery -A app worker --concurrency=4 --loglevel=inforestart: unless-stoppednetworks:- mynetcelery-flower:image: lab:python-packagescontainer_name: celery-flowerports:- 5555:5555volumes:- D:\dockerMount\code\celery:/home/celeryworking_dir: /home/celerycommand: celery -A app flower --port=5555restart: unless-stoppednetworks:- mynetredis:image: bitnami/redis:7.2.4-debian-12-r16container_name: redisenvironment:- REDIS_PASSWORD=redisisthebestnetworks:- mynetnetworks:mynet:external: false

在这个 Docker Compose 配置中,我们定义了三个服务:

  • fastapi:FastAPI 应用,负责接收用户请求并分发任务。
  • celery-worker:Celery 工作节点,负责执行异步任务。
  • celery-flower:Celery 的监控工具,提供任务执行的可视化界面。
  • redis:Redis 服务,作为 Celery 的消息代理和结果存储。

代码的功能和价值

功能

  1. 异步任务执行

    • 用户可以通过 /tasks 接口提交 Python 代码,代码会被异步执行。
    • 任务的执行结果可以通过 /tasks/{task_id} 接口查询。
  2. 任务状态管理

    • 任务的状态(如 PENDINGSUCCESSFAILURE 等)可以通过 /tasks/{task_id} 接口查询。
  3. 高性能和可扩展性

    • 使用 FastAPI 和 Celery 构建的异步任务引擎能够处理高并发的任务请求。
    • Celery 的分布式特性使得系统可以轻松扩展以应对更多的任务。
  4. 安全性

    • 通过设置 time_limitexpires,可以限制任务的执行时间和过期时间,防止恶意代码的长时间执行。
  5. 易用性

    • FastAPI 自动生成的 Swagger UI 使得 API 的使用和调试更加方便。
    • Pydantic 模型确保了请求和响应数据的类型安全。

价值

  1. 高效的任务处理

    • 该系统能够高效地处理大量异步任务,适用于需要异步执行代码的场景,如在线代码执行、数据处理、图像处理等。
  2. 可扩展性

    • 通过 Celery 的分布式任务队列,系统可以轻松扩展以处理更多的任务,适合高并发场景。
  3. 安全性

    • 通过限制任务的执行时间和过期时间,系统能够有效防止恶意代码的滥用。
  4. 易用性

    • FastAPI 和 Pydantic 的结合使得 API 的开发和维护更加简单,同时提供了自动生成的文档和类型检查。
  5. 灵活性

    • 系统支持自定义任务的执行逻辑,可以根据业务需求扩展任务类型和功能。

总结

通过 FastAPI、Celery 和 Redis 的结合,我们构建了一个高性能、可扩展的分布式异步任务引擎。它能够高效地处理用户提交的 Python 代码,并提供任务状态查询功能。该系统适用于需要异步执行代码的场景,具有高效、安全、易用和灵活的特点。

无论是构建一个在线代码执行平台,还是处理复杂的计算任务,这个项目都为你提供了一个强大的基础。希望这篇文章能为你带来启发,让你在异步任务处理的道路上走得更远!

附图

发送任务
在这里插入图片描述
查询结果
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/63682.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【win10+RAGFlow+Ollama】搭建本地大模型助手(教程+源码)

一、RAGFlow简介 RAGFlow是一个基于对文档深入理解的开源RAG(Retrieval-augmented Generation,检索增强生成)引擎。 主要作用: 让用户创建自有知识库,根据设定的参数对知识库中的文件进行切块处理,用户向大…

C/C++圣诞树

系列文章 序号直达链接1C/C爱心代码2C/C跳动的爱心3C/C李峋同款跳动的爱心代码4C/C满屏飘字表白代码5C/C大雪纷飞代码6C/C烟花代码7C/C黑客帝国同款字母雨8C/C樱花树代码9C/C奥特曼代码10C/C精美圣诞树11C/C俄罗斯方块12C/C贪吃蛇13C/C孤单又灿烂的神-鬼怪14C/C闪烁的爱心15C…

投标心态:如何在“标海战术”中保持清醒的头脑?

在竞争激烈的市场环境下,“标海战术”——即大规模参与投标——已经成为许多企业争取市场份额的重要策略。然而,盲目追求投标数量可能导致资源浪费、团队疲劳以及战略目标的模糊化。在这种高强度的竞争模式中,如何保持清醒的头脑,…

研发效能DevOps: Vite 使用 Element Plus

目录 一、实验 1.环境 2.初始化前端项目 3.安装 vue-route 4.安装 pinia 5.安装 axios 6.安装 Element Plus 7.gitee创建工程 8. 配置路由映射 9.Vite 使用 Element Plus 二、问题 1.README.md 文档推送到gitee未自动换行 2.访问login页面显示空白 3.表单输入账户…

NVIDIA DeepStream插件之Gst-nvtracker

NVIDIA DeepStream插件之Gst-nvtracker 1. 源由2. 基础知识3. Gst-nvtracker插件3.1 插件参数3.2 插件API接口 4. 分析问题5. 总结6. 参考资料 1. 源由 这篇的主要目的是稍微吐槽下NVIDIA的设计,当然其实他们做的还是不错的(从系统架构设计角度看&#…

进程内存转储工具|内存镜像提取-取证工具

1.内存转储,内存转储(Memory Dump)是将计算机的物理内存(RAM)内容复制到一个文件中的过程,这个文件通常被称为“内存转储文件”或“核心转储文件”(Core Dump),内存转储的主要目的是…

Lua语言入门 - Lua 面向对象

Lua 面向对象 面向对象编程(Object Oriented Programming,OOP)是一种非常流行的计算机编程架构,通过创建和操作对象来设计应用程序。 以下几种编程语言都支持面向对象编程: CJavaObjective-CSmalltalkC#Ruby Lua 是…

Pyqt6在lineEdit中输入文件名称并创建或删除JSON文件

1、创建JSON文件 代码 import osdef addModulekeyWordFile(self):if "" ! self.lineEdit_module.text():moduleFile self.lineEdit_module.text() .jsonelse:self.toolLogPrinting(请输入模块名称)returnfilePath modulekeyWordFileDir moduleFileif os.path.e…

数据结构--堆的向上调整和向下调整

文章目录 1.完全二叉树2.堆向上调整3.堆向下调整4.测试代码 1.完全二叉树 下面的这个就是对于我们的完全二叉树的这个逻辑结构和物理结构的说明: 逻辑结构就是我们自己认为的进行购想出来的; 但是这个物理结构却是我们的这个数据结构在内存里面的真是…

智能挂号系统设计典范:SSM 结合 Vue 在医院的应用实现

摘要 随着信息技术在管理上越来越深入而广泛的应用,管理信息系统的实施在技术上已逐步成熟。本文介绍了医院预约挂号系统的开发全过程。通过分析医院预约挂号系统管理的不足,创建了一个计算机管理医院预约挂号系统的方案。文章介绍了医院预约挂号系统的系…

Transform组件的用法

文章目录 1. 概念介绍2. 使用方法3. 示例代码我们在上一章回中介绍了Checkbox Widget相关的内容,本章回中将介绍Transform Widget.闲话休提,让我们一起Talk Flutter吧。 1. 概念介绍 我们在这里说的Transform是一种容器类widget,它和Container组件类似。它可以包含其它的组件…

go面试问题

1 Go的内存逃逸如何分析 go build -gcflags-m main_pointer.go 2 http状态码 300 请求的资源可包括多个位置,相应可返回一个资源特征与地址的列表用于用户终端(例如:浏览器)选择 301 永久移动。请求的资源已被永久的移动到新U…

TouchGFX移植(5)增加触屏驱动

一)增加驱动代码gt9xxx.c和ctiic.c到工程中的BSP目录下: 二)更改触摸文件STM32TouchController.cpp 1)在STM32TouchController.cpp文件中增加: #include “gt9xxx.h” 2)增加gt9xxx_init(); void STM32TouchControlle…

资源型数字化平台该如何顺利运营?

一、引言 随着信息技术的迅猛发展,资源型数字化平台在各领域的重要性日益凸显。此类平台整合各类资源,以数字化手段提升资源利用效率与价值,但确保其顺利运营面临诸多挑战。 二、资源型数字化平台特点 资源型数字化平台具有资源整合性&…

GitLab的安装和使用

1.GitLab 环境说明 系统版本 CentOS 7.2 x86_64 软件版本 gitlab-ce-10.8.4 GitLab 是一个用于仓库管理系统的开源项目,使用Git作为代码管理工具,并在此基础上搭建起来的web服务。可通过Web界面进行访问公开的或者私人项目。它拥有与Github类似的功能…

Leetcode 串联所有单词的子串

算法思想(中文解释) 这道题目要求我们在字符串 s 中找到所有子串,这些子串是字符串数组 words 中所有单词的串联,并且每个单词只能使用一次,且顺序可以任意。下面是代码的算法思想: 1. 核心思路 分解问题…

解析在OceanBase创建分区的常见问题|OceanBase 用户问题精粹

在《分区策略和管理分区计划的实践方案》这篇文章中,我们介绍了在ODC中制定分区策略及有效管理分区计划的经验。有不少用户在该帖下提出了使用中的问题,其中一个关于创建分区的限制条件的问题,也是很多用户遭遇的老问题。因此本文以其为切入&…

有哪些免费的 ERP 软件可供选择?哪些 ERP 软件使用体验较好?

想找个 “免费” 的 ERP 软件? 咱得知道,ERP 那可是涉及财务、人力、供应链、采购、销售等好多方面的重要企业软件。功能这么全,能免费才怪呢!真要是有免费的,早就火遍大江南北,说不定把市场都垄断了&…

centos-stream9系统安装docker

如果之前安装过docker需要删除之前的。 sudo dnf -y remove docker docker-client docker-client-latest docker-common docker-latest docker-latest-logrotate docker-logrotate docker-engine 安装yum-utils工具: dnf -y install yum-utils dnf-plugin…

了解cuda的统一内存

1. CUDA 6中的统一内存 在CUDA 6中,从Kepler GPU架构(计算能力3.0或更高)开始,在64位Windows 7、8和Linux操作系统(内核2.6.18)上开始支持统一内存. 从CUDA 6开始,NVIDIA推出了CUDA平台历史上…