Python FastAPI + Celery + RabbitMQ 分布式图片水印处理系统

  1. FastAPI 服务器
  2. Celery 任务队列
  3. RabbitMQ 作为消息代理
  4. 定时任务处理

首先创建项目结构:

c:\Users\Administrator\Desktop\meitu\
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── celery_app.py
│   ├── tasks.py
│   └── config.py
├── requirements.txt
└── celery_worker.py
  1. 首先创建 requirements.txt:
fastapi==0.104.1
uvicorn==0.24.0
celery==5.3.4
python-dotenv==1.0.0
requests==2.31.0
  1. 创建配置文件:
from dotenv import load_dotenv
import osload_dotenv()# RabbitMQ配置
RABBITMQ_HOST = os.getenv("RABBITMQ_HOST", "localhost")
RABBITMQ_PORT = os.getenv("RABBITMQ_PORT", "5672")
RABBITMQ_USER = os.getenv("RABBITMQ_USER", "guest")
RABBITMQ_PASS = os.getenv("RABBITMQ_PASS", "guest")# Celery配置
CELERY_BROKER_URL = f"amqp://{RABBITMQ_USER}:{RABBITMQ_PASS}@{RABBITMQ_HOST}:{RABBITMQ_PORT}//"
CELERY_RESULT_BACKEND = "rpc://"# 定时任务配置
CELERY_BEAT_SCHEDULE = {'process-images-every-hour': {'task': 'app.tasks.process_images','schedule': 3600.0,  # 每小时执行一次},'daily-cleanup': {'task': 'app.tasks.cleanup_old_images','schedule': 86400.0,  # 每天执行一次}
}
  1. 创建 Celery 应用:
from celery import Celery
from app.config import CELERY_BROKER_URL, CELERY_RESULT_BACKEND, CELERY_BEAT_SCHEDULEcelery_app = Celery('image_processing',broker=CELERY_BROKER_URL,backend=CELERY_RESULT_BACKEND,include=['app.tasks']
)# 配置定时任务
celery_app.conf.beat_schedule = CELERY_BEAT_SCHEDULE
celery_app.conf.timezone = 'Asia/Shanghai'
  1. 创建任务文件:
from app.celery_app import celery_app
from app.watermark import ImageWatermarker
import os
from datetime import datetime, timedelta@celery_app.task
def add_watermark_task(image_path, text, position='center'):"""异步添加水印任务"""watermarker = ImageWatermarker()try:result_path = watermarker.add_watermark(image_path=image_path,text=text,position=position)return {"status": "success", "output_path": result_path}except Exception as e:return {"status": "error", "message": str(e)}@celery_app.task
def process_images():"""定时处理图片任务"""image_dir = "images/pending"if not os.path.exists(image_dir):return {"status": "error", "message": "Pending directory not found"}processed = 0for image in os.listdir(image_dir):if image.lower().endswith(('.png', '.jpg', '.jpeg')):add_watermark_task.delay(os.path.join(image_dir, image),"自动处理水印",'center')processed += 1return {"status": "success", "processed": processed}@celery_app.task
def cleanup_old_images():"""清理旧图片任务"""output_dir = "images/processed"if not os.path.exists(output_dir):return {"status": "error", "message": "Output directory not found"}threshold_date = datetime.now() - timedelta(days=7)cleaned = 0for image in os.listdir(output_dir):image_path = os.path.join(output_dir, image)if os.path.getctime(image_path) < threshold_date.timestamp():os.remove(image_path)cleaned += 1return {"status": "success", "cleaned": cleaned}
  1. 创建 FastAPI 应用:
from fastapi import FastAPI, File, UploadFile, BackgroundTasks
from fastapi.responses import JSONResponse
import os
from app.tasks import add_watermark_task
from app.celery_app import celery_appapp = FastAPI(title="图片水印处理服务")@app.post("/upload/")
async def upload_image(file: UploadFile = File(...),text: str = "水印文本",position: str = "center"
):# 保存上传的文件file_path = f"images/uploads/{file.filename}"os.makedirs(os.path.dirname(file_path), exist_ok=True)with open(file_path, "wb") as buffer:content = await file.read()buffer.write(content)# 创建异步任务task = add_watermark_task.delay(file_path, text, position)return JSONResponse({"status": "success","message": "图片已上传并加入处理队列","task_id": task.id})@app.get("/task/{task_id}")
async def get_task_status(task_id: str):task = celery_app.AsyncResult(task_id)if task.ready():return {"status": "completed", "result": task.result}return {"status": "processing"}@app.get("/tasks/scheduled")
async def get_scheduled_tasks():return {"tasks": celery_app.conf.beat_schedule}
  1. 创建 Celery worker 启动文件:
from app.celery_app import celery_appif __name__ == '__main__':celery_app.start()

使用说明:

  1. 首先安装依赖:
pip install -r requirements.txt
  1. 确保 RabbitMQ 服务已启动

  2. 启动 FastAPI 服务器:

uvicorn app.main:app --reload
  1. 启动 Celery worker:
celery -A celery_worker.celery_app worker --loglevel=info
  1. 启动 Celery beat(定时任务):
celery -A celery_worker.celery_app beat --loglevel=info

这个系统提供以下功能:

  1. 通过 FastAPI 接口上传图片并异步处理水印
  2. 使用 Celery 处理异步任务队列
  3. 使用 RabbitMQ 作为消息代理
  4. 支持定时任务:
    • 每小时自动处理待处理图片
    • 每天清理一周前的旧图片
  5. 支持任务状态查询
  6. 支持查看计划任务列表

API 端点:

  • POST /upload/ - 上传图片并创建水印任务
  • GET /task/{task_id} - 查询任务状态
  • GET /tasks/scheduled - 查看计划任务列表

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/74176.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【蓝桥杯】每日练习 Day18

目录 前言 动态求连续区间和 分析 代码 数星星 分析 代码 星空之夜 分析 代码 前言 接下来是今天的题目&#xff08;本来是有四道题的但是有一道题是前面讲过&#xff08;逆序数的&#xff0c;感兴趣的小伙伴可以去看我归并排序的那一篇&#xff09;的我就不再过多赘…

基于银河麒麟桌面服务器操作系统的 DeepSeek本地化部署方法【详细自用版】

一、3种方式使用DeepSeek 1.本地部署 服务器操作系统环境进行,具体流程如下(桌面环境步骤相同): 本例所使用银河麒麟高级服务器操作系统版本信息: (1)安装ollama 方式一:按照ollama官网的下载指南,执行如下命令: curl -fsSL https://ollama.com/install.sh | sh方…

Python入门(7):Python序列结构-字典

字典Dictionary 字典(dictionary)和列表类似&#xff0c;也是可变序列&#xff0c;不过与列表不同&#xff0c;它是无序的可变序列&#xff0c;保存的为容是以“键-值对”的形式存放的。 Python 中的字典相当于 Java 或者 C中的 Map 对象。在C#中,就是Dictionary<TKey,TVa…

Flutter项目之构建打包分析

目录&#xff1a; 1、准备部分2、构建Android包2.1、配置修改部分2.2、编译打包 3、构建ios包3.1、配置修改部分3.2、编译打包 1、准备部分 2、构建Android包 2.1、配置修改部分 2.2、编译打包 执行flutter build apk命令进行打包。 3、构建ios包 3.1、配置修改部分 3.2、编译…

不用再付费~全网书源一键下载,实现阅读自由!!!

现在市面上有许多免费你看书的软件&#xff0c;但都软件内太多广告弹窗&#xff0c;这无疑是很烦&#xff0c;有事一不小心点进去就下载了软件&#xff0c;简直让人头大&#xff01; 如果你遇到这样的难题那么就应该看下本文~ 这是一款能一键将在线连载小说整合下载成标准格式&…

GCC RISCV 后端 -- GIMPLE IR 表示的一些理解

C/C源代码经过 GCC 解析&#xff08;Parse&#xff09;及转换后&#xff0c;通过 GIMPLE IR 予以表示&#xff08;Representation&#xff09;。其中&#xff0c;一个C/C源文件&#xff0c;通过 宏处理后&#xff0c;形成一个 转译单元&#xff08;Translation Unit&#xff09…

JAVA设计模式之适配器模式《太白金星有点烦》

太白金星握着月光凝成的鼠标&#xff0c;第108次检查南天门服务器的运行日志。这个刚从天枢院调来的三等仙官&#xff0c;此刻正盯着瑶池主机房里的青铜鼎发愁——鼎身上"天地同寿"的云纹间&#xff0c;漂浮着三界香火系统每分钟吞吐的十万条功德数据。看着居高不下的…

以太坊DApp开发脚手架:Scaffold-ETH 2 详细介绍与搭建教程

一、什么是Scaffold-ETH 2 Scaffold-ETH 2是一个开源的最新工具包&#xff0c;类似于脚手架。用于在以太坊区块链上构建去中心化应用程序 &#xff08;DApp&#xff09;。它旨在使开发人员更容易创建和部署智能合约&#xff0c;并构建与这些合约交互的用户界面。 Scaffold-ETH…

毕业设计:实现一个基于Python、Flask和OpenCV的人脸打卡Web系统(六)

毕业设计:实现一个基于Python、Flask和OpenCV的人脸打卡Web系统(六) Flask Flask是一个使用 Python 编写的轻量级 Web 应用框架。其 WSGI 工具箱采用 Werkzeug ,模板引擎则使用 Jinja2 。Flask使用 BSD 授权。 Flask也被称为 “microframework” ,因为它使用简单的核心,…

第十一章 VGA显示图片(还不会)

FPGA至简设计实例 前言 一、项目背景 1. IP核概述 IP 核(Intellectual Property core)指的是知识产权核或知识产权模块,其是具有特定电路功能的硬件描述语言程序,在EDA技术开发中具有十分重要的地位。美国著名的Dataquest咨询公司将 半导体产业的IP定义为“用于ASIC或FPGA…

浙江大学公开课|第二季|从大模型、智能体到复杂AI应用系统的构建——以产业大脑为例

大家好&#xff0c;我是吾鳴。 前沿回顾 吾鳴之前给大家分享过浙江大学DeepSeek系列公开课第一季&#xff0c;第一季一共八讲&#xff0c;内容介绍丰富&#xff0c;内容之广&#xff0c;看完粉丝朋友直呼浙大良心。这八讲公开课名称分别是&#xff1a; 第一期&#xff08;上&…

Spring AOP中为所有类型通知传递参数的完整示例,包含详细注释和参数传递方式

以下是Spring AOP中为所有类型通知传递参数的完整示例&#xff0c;包含详细注释和参数传递方式&#xff1a; // 1. 目标类&#xff08;被增强的类&#xff09; package com.example;public class TargetService {public void doTask(String param) {System.out.println("…

【Git教程】将dev分支合并到master后,那么dev分支该如何处理

将 dev 合并到 master 后的分支状态与操作指南 1. 合并后的分支状态 dev 分支不会消失&#xff1a; Git 的 git merge 命令仅将 dev 的内容合并到 master&#xff0c;不会删除 dev 分支。合并后&#xff0c;dev 分支仍然存在&#xff0c;其历史记录和代码保持不变。 分支的 H…

【go】异常处理panic和recover

panic 和 recover 当然能触发程序宕机退出的&#xff0c;也可以是我们自己&#xff0c;比如经过检查判断&#xff0c;当前环境无法达到我们程序进行的预期条件时&#xff08;比如一个服务指定监听端口被其他程序占用&#xff09;&#xff0c;可以手动触发 panic&#xff0c;让…

CSS层叠顺序

介绍 在 CSS 中&#xff0c;元素的层叠顺序决定了当多个元素重叠时&#xff08;跟布局没有完全的关系&#xff0c;也就是说层叠顺序只会在几个叠放元素上进行比较&#xff0c;而不会改变布局&#xff09;&#xff0c;哪个元素显示在最上面&#xff0c;哪个元素显示在最下面。 …

数制——FPGA

1、定点数 定点数的三种表示方式&#xff1a; 原码&#xff1a;符号位 绝对值 表示方法 反码&#xff1a;正数的反码表示 与原码表示一致&#xff0c;负数的反码表示 除符号位&#xff0c;其他位全都取反 补码&#xff1a;正数的补码表示 与原码表示一致&#xff0c;负数的补码…

在用redis当中可能遇到的问题解决方案以及redis中的一些名词解释

在用redis当中可能遇到的问题解决方案以及redis中的一些名词解释 Redis篇一、缓存穿透&#xff1a;解决方案&#xff1a;缓存空数据布隆过滤器 二、缓存击穿解决方案互斥锁&#xff0c;强一致性&#xff0c;性能差&#xff0c;速度慢逻辑过期&#xff0c;数据不同步&#xff0c…

一文详解QT环境搭建:Windows使用CLion配置QT开发环境

在当今的软件开发领域&#xff0c;跨平台应用的需求日益增长&#xff0c;Qt作为一款流行的C图形用户界面库&#xff0c;因其强大的功能和易用性而备受开发者青睐。与此同时&#xff0c;CLion作为一款专为C/C打造的强大IDE&#xff0c;提供了丰富的特性和高效的编码体验。本文将…

【区块链安全 | 第二十四篇】单位和全局可用变量(二)

文章目录 单位和全局可用变量&#xff08;Units and Globally Available Variables&#xff09;特殊变量和函数1. 区块和交易属性2. ABI 编码和解码函数3. bytes 成员函数4. string 成员函数5. 错误处理6. 数学和加密函数7. 地址类型成员函数8. 与合约相关9. 类型信息 单位和全…

一种监控录像视频恢复的高效解决方案,从每一帧中寻找可能性

该软件旨在恢复从监控设备中删除或丢失的视频。该程序经过调整以处理大多数流行供应商的闭路电视系统中使用的专有格式&#xff0c;并通过智能重建引擎进行了增强&#xff0c;能够为监控记录提供任何通用解决方案都无法实现的恢复结果。如果不需要持续使用该软件&#xff0c;则…