使用 `Celery` 配合 `RabbitMQ` 作为消息代理,实现异步任务的调度、重试、定时任务以及错误监控等功能

python基础代码、优化、扩展和监控的完整示例。此示例使用 Celery 配合 RabbitMQ 作为消息代理,实现异步任务的调度、重试、定时任务以及错误监控等功能。


项目结构

我们将项目结构组织如下,以便代码逻辑清晰且易于扩展:

project/
│
├── celery_app.py        # Celery应用的配置和初始化
├── tasks.py             # 异步任务的定义
├── monitor.py           # 异常监控和报警
└── main.py              # 测试异步任务调用

1. celery_app.py - 配置 Celery 应用

# celery_app.py
from celery import Celery
from celery.schedules import crontabapp = Celery('tasks', broker='amqp://localhost//', backend='redis://localhost')# 基础配置
app.conf.update(result_expires=3600,                     # 任务结果过期时间task_acks_late=True,                     # 确保任务执行后才确认完成worker_prefetch_multiplier=1,            # 单次预取任务数task_serializer='json',                  # 任务数据序列化格式result_serializer='json',                # 任务结果序列化格式accept_content=['json'],                 # 仅接收json格式task_soft_time_limit=300,                # 软超时时间task_time_limit=600,                     # 硬超时时间worker_hijack_root_logger=False,         # 不劫持主日志worker_log_format='[%(asctime)s: %(levelname)s/%(processName)s] %(message)s',
)# 定时任务配置
app.conf.beat_schedule = {'scheduled_add': {'task': 'tasks.add','schedule': crontab(hour=7, minute=30, day_of_week=1),'args': (16, 16),},
}# 任务路由配置:不同的任务可以走不同的队列
app.conf.task_routes = {'tasks.add': {'queue': 'high_priority'},
}

2. tasks.py - 定义任务

# tasks.py
from celery_app import app
from monitor import task_failure_handler
import time# 定义基础任务
@app.task(bind=True, max_retries=3)
def add(self, x, y):try:time.sleep(5)  # 模拟耗时任务return x + yexcept Exception as exc:raise self.retry(exc=exc, countdown=5)  # 5秒后重试

3. monitor.py - 监控与报警

# monitor.py
from celery.signals import task_failure@task_failure.connect
def task_failure_handler(sender=None, exception=None, **kwargs):# 发送报警通知或记录错误日志print(f"[ALERT] Task {sender.name} failed due to {exception}")

4. main.py - 测试任务调用

# main.py
from tasks import add
from celery_app import appif __name__ == "__main__":# 启动异步任务result = add.delay(4, 6)print("Task state:", result.state)  # 打印任务状态print("Result:", result.get())      # 获取任务结果(阻塞等待)# 组合任务示例:Groupfrom celery import groupgroup_tasks = group(add.s(i, i) for i in range(10))group_result = group_tasks.apply_async()print("Group Result:", group_result.get())# 链式任务示例:Chordfrom celery import chordcallback = add.s(10, 20)chord_tasks = chord((add.s(i, i) for i in range(10)), callback)chord_result = chord_tasks.apply_async()print("Chord Result:", chord_result.get())

运行和监控

  1. 启动 RabbitMQ 服务

    sudo service rabbitmq-server start
    
  2. 启动 Celery Worker
    运行以下命令,指定 high_priority 队列处理高优先级任务。

    celery -A celery_app worker -Q high_priority,default -l info
    
  3. 启动 Celery Beat(用于调度定时任务):

    celery -A celery_app beat -l info
    
  4. 启动 Flower 实时监控(可选):

    celery -A celery_app flower --port=5555
    

    访问 http://localhost:5555 进行任务和 worker 状态的实时监控。

  5. Prometheus 和 Grafana 监控(可选)
    配置 Celery 的自定义事件,并使用 Prometheus 采集数据,再通过 Grafana 可视化 Celery 的性能指标。


此示例项目具有以下特性:

  • 异步任务:通过 delay() 方法调用。
  • 重试机制:在任务异常时自动重试。
  • 任务调度:支持定时任务,利用 Celery Beat 实现周期性任务调度。
  • 报警机制:在任务失败时发送报警或日志记录。
  • 监控系统:使用 Flower 进行实时监控,支持 Prometheus 和 Grafana 扩展。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/58667.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Unity3D UI 拖拽

Unity3D 实现 UI 元素拖拽功能。 UI 拖拽 通常画布上的 UI 元素都是固定位置的,我们可以通过实现拖拽接口,让 UI 元素可以被拖拽到其他位置。 拖拽接口 创建一个脚本 UIDrag.cs,在默认继承的 MonoBehaviour 后面,再继承三个接…

【LeetCode】【算法】394. 字符串解码

LeetCode 394. 字符串解码 题目描述 给定一个经过编码的字符串,返回它解码后的字符串。 编码规则为: k[encoded_string],表示其中方括号内部的 encoded_string 正好重复 k 次。注意 k 保证为正整数。 你可以认为输入字符串总是有效的;输入字…

深度学习:预训练(Pre-training详解

预训练(Pre-training详解 预训练(Pre-training)是深度学习和自然语言处理领域中一个核心概念,特别是在面对需要大规模参数模型的应用场景下。预训练涉及在通常是大规模且多样化的数据集上训练模型,目的是捕获广泛且通…

【新手入门软件测试--该如何分辨前后端问题及如何定位日志--前后端问题分辨与日志定位查询问题】

前后端问题分辨与日志定位查询 一、前端问题1. 页面无法加载2. 样式错乱3. API请求失败4. 数据格式错误5. 跨域请求问题 二、后端问题6. 表单验证失败7. 数据库连接失败8. 请求超时9. 权限问题10. JavaScript运行错误 三、日志查询的方法1. 查看日志文件2. 过滤关键字3. 实时查…

基于 SSM(Spring + Spring MVC + MyBatis)框架构建电器网上订购系统

基于 SSM(Spring Spring MVC MyBatis)框架构建电器网上订购系统可以为用户提供一个方便快捷的购物平台。以下将详细介绍该系统的开发流程,包括需求分析、技术选型、数据库设计、项目结构搭建、主要功能实现以及前端页面设计。 需求分析 …

esp32学习:利用虫洞ESP32开发板,快速实现无线图传

我们的虫洞开发板,能够完美运行esp who AI代码,所以实现无线图传那是非常容易的,我们先看看examples目录: 里面有比较多的web例程,在这些例程下,稍作修改,就可以快速实现我的图传无线功能&#…

mac m1 docker本地部署canal 监听mysql的binglog日志

mac m1 docker本地部署canal监听mysql的binglog日志(虚拟机同理) 根据黑马视频部署 1.docker 部署mysql 1.docker拉取mysql 镜像 因为m1是arm架构.需要多加一条信息 正常拉取 docker pull mysql:tagm1拉取 5.7的版本. tag需要自己指定版本 docker pull --platform linux/x…

还在为慢速数据传输苦恼?Linux 零拷贝技术来帮你!

前言 程序员的终极追求是什么?当系统流量大增,用户体验却丝滑依旧?没错!然而,在大量文件传输、数据传递的场景中,传统的“数据搬运”却拖慢了性能。为了解决这一痛点,Linux 推出了 零拷贝 技术&…

基于java+SpringBoot+Vue的微服务在线教育系统设计与实现

项目运行 环境配置: Jdk1.8 Tomcat7.0 Mysql HBuilderX(Webstorm也行) Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。 项目技术: Springboot mybatis Maven mysql5.7或8.0等等组成&#x…

【无标题】西安交通大学提出少锚点的端到端车道线检测算法Polar R-CNN

Abstract 车道线检测在自动驾驶中是一个关键且充满挑战的任务,特别是在实际场景中,由于车道线可能因其他车辆而被遮挡、形状纤细且长度较长,检测难度增大。现有基于锚点的检测方法通常依赖于预设的锚点来提取特征,并随后对车道线…

【手撕排序3】归并排序

🍃 本系列包括常见的各种排序算法,如果感兴趣,欢迎订阅🚩 🎊个人主页:小编的个人主页 🎀 🎉欢迎大家点赞👍收藏⭐文章 ✌️ 🤞 🤟 🤘 &#x1f91…

Vue中使用Antd中a-table实现表格数据列合并展示

原数据 根据需求实现当前两列数据中有相同数据时,合并列单元格 实现 源码 数据 const dataSource = ref([{id: 1,pl: "冰箱",zznd: "P1",sm: "说明说明说明1",dw: "台",gs: "1",dj: "100"},{id: 1,pl: "冰…

点击评论详情,跳到评论页面,携带对象参数写法:

// 跳转到回复页面去goReply() {第1种方法,跳转并携带参数写法如下:uni.navigateTo({//先把要传过去的item变成字符串形式。但是要注意的是,这样携带字符串形式,是有长度限制的。所以内容多了后,要注意这种方式遗漏内容…

简单的签到程序 python笔记

简单的人脸识别签到程序 在看完以下代码后,略微修改一番,你就能够组装出自己的“简单的人脸识别签到程序”了。 请注意库的安装,否则会不可用。 你可以通过在cmd中使用:pip install来安装。 以下代码运行python 3.8 UI界面 使…

如何利用指纹浏览器爬虫绕过Cloudflare的防护?

网络爬虫能够系统地浏览网页并提取所需的数据,通常被用于市场研究、数据分析或者竞争情报。然而,一些反爬虫机制给网络爬虫的工作带来了不少挑战和风险。 其中,Cloudflare提供了多层次的防护机制,包括IP封锁、速率限制、CAPTCHA验…

数据结构-数组(稀疏矩阵转置)和广义表

目录 1、数组定义 1)数组存储地址计算示例①行优先②列优先 2)稀疏矩阵的转置三元组顺序表结构定义 ①普通矩阵转置②三元组顺序表转置稀疏矩阵③稀疏矩阵的快速转置 3)十字链表结构定义 2、广义表定义 1)基本操作①GetHead②GetT…

mysql常见的一些配置项

MySQL 有许多配置选项,可以用来调整其行为以满足特定的需求。以下是一些常见的配置选项,除了大小写敏感之外,这些配置选项也经常被调整: 1. 字符集和排序规则 character_set_server: 设置服务器的默认字符集。collation_server:…

【Spring】Spring Web MVC基础入门~(含大量例子)

阿华代码,不是逆风,就是我疯 你们的点赞收藏是我前进最大的动力!! 希望本文内容能够帮助到你!! 目录 一:什么是Spring Web MVC 1:Servlet 2:总结 二:MVC …

有向图的完全可达性(有向图搜索全路径的问题) C#DFs

在考察输入输出方面我觉得是道难题了 第一次遇见邻接表的数据结构该怎么声明 卡码网105 在力扣没找见完全相同的题 感觉需要多练习多复习这种类型的题 105. 有向图的完全可达性 题目描述 给定一个有向图,包含 N 个节点,节点编号分别为 1&…

登陆页面渗透测试常见的20种思路与总结

【渗透测试】16个实用谷歌浏览器插件分享 飞雪网络安全人才培养计划,绝对零区,公益教学! 思路总结 1、之前是否已经留过后门,是,直接getshell,否,进行测试 2、SQL注入&万能密码&#xf…