使用 `Celery` 配合 `RabbitMQ` 作为消息代理,实现异步任务的调度、重试、定时任务以及错误监控等功能

python基础代码、优化、扩展和监控的完整示例。此示例使用 Celery 配合 RabbitMQ 作为消息代理,实现异步任务的调度、重试、定时任务以及错误监控等功能。


项目结构

我们将项目结构组织如下,以便代码逻辑清晰且易于扩展:

project/
│
├── celery_app.py        # Celery应用的配置和初始化
├── tasks.py             # 异步任务的定义
├── monitor.py           # 异常监控和报警
└── main.py              # 测试异步任务调用

1. celery_app.py - 配置 Celery 应用

# celery_app.py
from celery import Celery
from celery.schedules import crontabapp = Celery('tasks', broker='amqp://localhost//', backend='redis://localhost')# 基础配置
app.conf.update(result_expires=3600,                     # 任务结果过期时间task_acks_late=True,                     # 确保任务执行后才确认完成worker_prefetch_multiplier=1,            # 单次预取任务数task_serializer='json',                  # 任务数据序列化格式result_serializer='json',                # 任务结果序列化格式accept_content=['json'],                 # 仅接收json格式task_soft_time_limit=300,                # 软超时时间task_time_limit=600,                     # 硬超时时间worker_hijack_root_logger=False,         # 不劫持主日志worker_log_format='[%(asctime)s: %(levelname)s/%(processName)s] %(message)s',
)# 定时任务配置
app.conf.beat_schedule = {'scheduled_add': {'task': 'tasks.add','schedule': crontab(hour=7, minute=30, day_of_week=1),'args': (16, 16),},
}# 任务路由配置:不同的任务可以走不同的队列
app.conf.task_routes = {'tasks.add': {'queue': 'high_priority'},
}

2. tasks.py - 定义任务

# tasks.py
from celery_app import app
from monitor import task_failure_handler
import time# 定义基础任务
@app.task(bind=True, max_retries=3)
def add(self, x, y):try:time.sleep(5)  # 模拟耗时任务return x + yexcept Exception as exc:raise self.retry(exc=exc, countdown=5)  # 5秒后重试

3. monitor.py - 监控与报警

# monitor.py
from celery.signals import task_failure@task_failure.connect
def task_failure_handler(sender=None, exception=None, **kwargs):# 发送报警通知或记录错误日志print(f"[ALERT] Task {sender.name} failed due to {exception}")

4. main.py - 测试任务调用

# main.py
from tasks import add
from celery_app import appif __name__ == "__main__":# 启动异步任务result = add.delay(4, 6)print("Task state:", result.state)  # 打印任务状态print("Result:", result.get())      # 获取任务结果(阻塞等待)# 组合任务示例:Groupfrom celery import groupgroup_tasks = group(add.s(i, i) for i in range(10))group_result = group_tasks.apply_async()print("Group Result:", group_result.get())# 链式任务示例:Chordfrom celery import chordcallback = add.s(10, 20)chord_tasks = chord((add.s(i, i) for i in range(10)), callback)chord_result = chord_tasks.apply_async()print("Chord Result:", chord_result.get())

运行和监控

  1. 启动 RabbitMQ 服务

    sudo service rabbitmq-server start
    
  2. 启动 Celery Worker
    运行以下命令,指定 high_priority 队列处理高优先级任务。

    celery -A celery_app worker -Q high_priority,default -l info
    
  3. 启动 Celery Beat(用于调度定时任务):

    celery -A celery_app beat -l info
    
  4. 启动 Flower 实时监控(可选):

    celery -A celery_app flower --port=5555
    

    访问 http://localhost:5555 进行任务和 worker 状态的实时监控。

  5. Prometheus 和 Grafana 监控(可选)
    配置 Celery 的自定义事件,并使用 Prometheus 采集数据,再通过 Grafana 可视化 Celery 的性能指标。


此示例项目具有以下特性:

  • 异步任务:通过 delay() 方法调用。
  • 重试机制:在任务异常时自动重试。
  • 任务调度:支持定时任务,利用 Celery Beat 实现周期性任务调度。
  • 报警机制:在任务失败时发送报警或日志记录。
  • 监控系统:使用 Flower 进行实时监控,支持 Prometheus 和 Grafana 扩展。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/58667.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Unity3D UI 拖拽

Unity3D 实现 UI 元素拖拽功能。 UI 拖拽 通常画布上的 UI 元素都是固定位置的,我们可以通过实现拖拽接口,让 UI 元素可以被拖拽到其他位置。 拖拽接口 创建一个脚本 UIDrag.cs,在默认继承的 MonoBehaviour 后面,再继承三个接…

基于 SSM(Spring + Spring MVC + MyBatis)框架构建电器网上订购系统

基于 SSM(Spring Spring MVC MyBatis)框架构建电器网上订购系统可以为用户提供一个方便快捷的购物平台。以下将详细介绍该系统的开发流程,包括需求分析、技术选型、数据库设计、项目结构搭建、主要功能实现以及前端页面设计。 需求分析 …

esp32学习:利用虫洞ESP32开发板,快速实现无线图传

我们的虫洞开发板,能够完美运行esp who AI代码,所以实现无线图传那是非常容易的,我们先看看examples目录: 里面有比较多的web例程,在这些例程下,稍作修改,就可以快速实现我的图传无线功能&#…

mac m1 docker本地部署canal 监听mysql的binglog日志

mac m1 docker本地部署canal监听mysql的binglog日志(虚拟机同理) 根据黑马视频部署 1.docker 部署mysql 1.docker拉取mysql 镜像 因为m1是arm架构.需要多加一条信息 正常拉取 docker pull mysql:tagm1拉取 5.7的版本. tag需要自己指定版本 docker pull --platform linux/x…

还在为慢速数据传输苦恼?Linux 零拷贝技术来帮你!

前言 程序员的终极追求是什么?当系统流量大增,用户体验却丝滑依旧?没错!然而,在大量文件传输、数据传递的场景中,传统的“数据搬运”却拖慢了性能。为了解决这一痛点,Linux 推出了 零拷贝 技术&…

基于java+SpringBoot+Vue的微服务在线教育系统设计与实现

项目运行 环境配置: Jdk1.8 Tomcat7.0 Mysql HBuilderX(Webstorm也行) Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。 项目技术: Springboot mybatis Maven mysql5.7或8.0等等组成&#x…

【无标题】西安交通大学提出少锚点的端到端车道线检测算法Polar R-CNN

Abstract 车道线检测在自动驾驶中是一个关键且充满挑战的任务,特别是在实际场景中,由于车道线可能因其他车辆而被遮挡、形状纤细且长度较长,检测难度增大。现有基于锚点的检测方法通常依赖于预设的锚点来提取特征,并随后对车道线…

【手撕排序3】归并排序

🍃 本系列包括常见的各种排序算法,如果感兴趣,欢迎订阅🚩 🎊个人主页:小编的个人主页 🎀 🎉欢迎大家点赞👍收藏⭐文章 ✌️ 🤞 🤟 🤘 &#x1f91…

Vue中使用Antd中a-table实现表格数据列合并展示

原数据 根据需求实现当前两列数据中有相同数据时,合并列单元格 实现 源码 数据 const dataSource = ref([{id: 1,pl: "冰箱",zznd: "P1",sm: "说明说明说明1",dw: "台",gs: "1",dj: "100"},{id: 1,pl: "冰…

数据结构-数组(稀疏矩阵转置)和广义表

目录 1、数组定义 1)数组存储地址计算示例①行优先②列优先 2)稀疏矩阵的转置三元组顺序表结构定义 ①普通矩阵转置②三元组顺序表转置稀疏矩阵③稀疏矩阵的快速转置 3)十字链表结构定义 2、广义表定义 1)基本操作①GetHead②GetT…

【Spring】Spring Web MVC基础入门~(含大量例子)

阿华代码,不是逆风,就是我疯 你们的点赞收藏是我前进最大的动力!! 希望本文内容能够帮助到你!! 目录 一:什么是Spring Web MVC 1:Servlet 2:总结 二:MVC …

有向图的完全可达性(有向图搜索全路径的问题) C#DFs

在考察输入输出方面我觉得是道难题了 第一次遇见邻接表的数据结构该怎么声明 卡码网105 在力扣没找见完全相同的题 感觉需要多练习多复习这种类型的题 105. 有向图的完全可达性 题目描述 给定一个有向图,包含 N 个节点,节点编号分别为 1&…

登陆页面渗透测试常见的20种思路与总结

【渗透测试】16个实用谷歌浏览器插件分享 飞雪网络安全人才培养计划,绝对零区,公益教学! 思路总结 1、之前是否已经留过后门,是,直接getshell,否,进行测试 2、SQL注入&万能密码&#xf…

qt QWebSocketServer详解

1、概述 QWebSocketServer 是 Qt 框架中用于处理 WebSocket 服务器端的类。它允许开发者创建 WebSocket 服务器,接受客户端的连接,并与之进行双向通信。WebSocket 是一种在单个 TCP 连接上进行全双工通讯的协议,它使得客户端和服务器之间的数…

掌握分布式系统的38个核心概念

天天说分布式分布式,那么我们是否知道什么是分布式,分布式会遇到什么问题,有哪些理论支撑,有哪些经典的应对方案,业界是如何设计并保证分布式系统的高可用呢? 1. 架构设计 这一节将从一些经典的开源系统架…

中小跨境卖家如何选择物流?

跨境物流作为电商交易的核心环节,其复杂性和多变性对卖家来说不言而喻。本文将为您详细解析跨境物流的七大流程、常见物流测评以及推荐的工具,帮助您在激烈的市场竞争中把握物流优势,提升业务效率和客户满意度。 跨境物流七大流程 1. 启运国出…

6大国有银行软开的薪资待遇清单

牛客上刷到一条关于计算机专业值得去的银行软开清单,其中对 6 大国有银行软开的薪资待遇分析我觉得很有必要同步给大家看一看。 截图信息来自牛客的漫长白日梦 其中邮储软开是最值得推荐的(offer 投票没输过),二线城市转正后第一个完整年的收入在 30 万左右,一线城市更高…

我们来学mysql -- EXPLAIN之ID(原理篇)

EXPLAIN之ID 题记ID 题记 2024美国大选已定,川普剑登上铁王座,在此过程中出谋划策的幕僚很重要,是他们决定了最终的执行计划在《查询成本之索引选择》中提到,explain的输出,就是优化器(幕僚)选…

蓝桥杯-网络安全比赛题目-遗漏的压缩包

小蓝同学给你发来了他自己开发的网站链接, 他说他故意留下了一个压缩包文件,里面有网站的源代码, 他想考验一下你的网络安全技能。 (点击“下发赛题”后,你将得到一个http链接。如果该链接自动跳转到https,…

新疆高校大数据实验室案例分享

高校大数据实验室建设,企业可以提供技术支持、实训平台和项目案例,高校则提供科研和教学资源,实现产学研一体化。不仅有利于大数据技术的应用和人才培养也有利于区域发展。 泰迪与新疆合作的院校包括新疆大学、昌吉学院等 新疆大…