分布式任务队列系统 celery 原理及入门

基本

Celery 是一个简单、灵活且可靠的分布式任务队列系统,用于在后台执行异步任务处理大量消息。支持任务调度、任务分发和结果存储,并且可以与消息代理(如 RabbitMQ、Redis 等)一起工作,以实现任务的队列管理和执行。

关键特性和概念:

  1. 分布式任务队列:Celery 允许你将任务分发到多个工作节点上,这些节点可以并行处理任务,从而提高系统的吞吐量和性能。
  2. 异步执行:Celery 支持异步执行任务,即任务可以在后台运行,而不阻塞主程序的执行。
  3. 定时任务:Celery 提供了定时任务功能,可以按照预定的时间间隔或特定时间点执行任务。
  4. 持久化:Celery 支持将任务结果持久化到数据库中,以便后续查询和分析。
  5. 多种消息传递协议:Celery 支持多种消息传递协议,如 RabbitMQ、Redis 等。

核心模块:

  1. 任务(Task):任务是 Celery 的基本单位,代表需要异步执行的函数。任务通过 @app.task 装饰器定义。
  2. 消息代理(Broker):消息代理是一个中间件,用于在客户端和 Worker 之间传递任务消息。常用的消息代理包括 RabbitMQ、Redis、Amazon SQS 等。
  3. Worker:Worker 是实际执行任务的进程。它从消息代理中获取任务并执行,然后将结果返回给结果后端(如果配置了结果后端)。
  4. 客户端(Client):客户端是发送任务到消息代理的部分,通常是你的应用程序代码。它调用任务并将其发送到消息代理。
  5. 结果后端(Result Backend):结果后端用于存储任务的执行结果,便于后续查询。常用的结果后端包括 Redis、数据库(如 PostgreSQL、MySQL)、MongoDB 等。
  6. Beat Scheduler:这是一个定时调度器,用于定期发送周期性任务到消息代理。它可以按照预定的时间间隔或特定时间点调度任务。

组成架构

请添加图片描述

Celery 的架构可以简化为三大核心组件:消息中间件(Message Broker)、任务执行单元(Worker)和任务执行结果存储(Task Result Store)

1. 消息中间件(Message Broker)

功能

  • 作为客户端和 Worker 之间的通信桥梁。
  • 接收来自客户端的任务消息,并将其分发给可用的 Worker。

常用实现

  • RabbitMQ:高性能、可靠性强,支持复杂路由规则。
  • Redis:轻量级、速度快,适合小规模应用。
  • Amazon SQS:托管服务,无需自行维护服务器。

2. 任务执行单元(Worker)

功能

  • 从消息中间件获取待处理的任务。
  • 执行实际的业务逻辑,即运行被装饰为 Celery 任务的函数。
  • 将执行结果发送到结果存储系统。

启动方式:

使用命令启动 Worker,例如:

celery -A tasks worker --loglevel=info

3. 任务执行结果存储(Task Result Store)

功能

  • 存储每个已完成任务的结果,以便后续查询或处理。

常用实现:

  1. Redis
  2. 数据库系统 (如 PostgreSQL, MySQL)
  3. MongoDB

入门

安装celery

pip install celery redis
或
pip3 install celery redis

定义和装饰任务

在代码中定义 Celery 应用和需要异步执行的任务函数。例如创建celery_task.py

from celery import Celery# broker 为消息中间件配置,这里用的是redis
# backend 为任务执行结果存储,也用的是redis
app = Celery('celery_task', broker='redis://localhost:32769/0', backend='redis://localhost:32769/1')# 通过装饰器指定任务执行单元,即消息接受后的处理函数
@app.task
def add(x, y):return f'{x}{y} 的和为 {x + y}'# 启动celery,准备好接收消息,一旦接收到消息就执行任务,并存储结果
if __name__ == '__main__':app.worker_main(['worker', '--loglevel=info'])

启动 Worker

启动一个或多个 Worker 进程来处理任务。这些 Worker 会连接到指定的消息代理并等待新任务到达。

方式一:

# 启动celery,准备好接收消息,一旦接收到消息就执行任务,并存储结果
if __name__ == '__main__':app.worker_main(['worker', '--loglevel=info'])

方式二:

确保你已经安装了celery并且正确配置环境变量

celery -A tasks worker --loglevel=info

启动完成:

请添加图片描述

发送任务

客户端代码调用定义好的 Celery 任务,并将其发送到消息代理。例如创建celery_add_task

from celery_task import add# 发送任务
result = add.delay(4, 6)print(result)
# 获取并打印结果(这会阻塞直到返回结果)
print(result.get())

执行结果:

请添加图片描述

(可选)获取结果

当你调用一个 Celery 任务时,你可以立即获取一个AsyncResult 实例,该实例可以用来检索任务的结果。

from celery_task import add
from celery.result import AsyncResult
import celery_task"""
使用Task ID创建AsyncResult对象,用于检查状态与获取最终计算出的值。第一个参数是发送任务时返回的task id
result = add.delay(4, 6)
print(result) # 直接打印出来的就是task_id
"""
# 第一种方式创建
# result = add.AsyncResult("16a6a1e4-9500-43b5-8a88-1058663d44b7")# 第二种方式创建
result = AsyncResult("16a6a1e4-9500-43b5-8a88-1058663d44b7", app=celery_task.app)status = result.status
if status == "SUCCESS":print('执行成功')
elif status == "FAILURE":print('执行失败')
elif status == "PENDING":print('任务等待中被执行')
elif status == "RETRY":print('任务异常后正在重试')
elif status == "STARTED":print('任务已经开始被执行')
else:print("未匹配到状态值")if result.ready():print(result.result)  # 打印最终计算出的值,如果已完成。
else:print("Task is still running")

注意

celery_task.py文件名和

代码app = Celery('celery_task', broker='redis://localhost:32769/0', backend='redis://localhost:32769/1')中的第一个参数名要一致,否则会报错

app端报错日志:

[2024-05-31 16:13:26,538: ERROR/MainProcess] Received unregistered task of type 'celery_task.add'.
The message has been ignored and discarded.Did you remember to import the module containing this task?
Or maybe you're using relative imports?Please see
https://docs.celeryq.dev/en/latest/internals/protocol.html
for more information.The full contents of the message body was:
b'[[4, 6], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (81b)The full contents of the message headers:
{'lang': 'py', 'task': 'celery_task.add', 'id': 'c89d27d5-fa1f-4ae2-98c3-7eecc6cc01f6', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'c89d27d5-fa1f-4ae2-98c3-7eecc6cc01f6', 'parent_id': None, 'argsrepr': '(4, 6)', 'kwargsrepr': '{}', 'origin': 'gen13554@fangyirui.local', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}}The delivery info for this task is:
{'exchange': '', 'routing_key': 'celery'}
Traceback (most recent call last):File "/Users/fangyirui/Library/Python/3.9/lib/python/site-packages/celery/worker/consumer/consumer.py", line 659, in on_task_receivedstrategy = strategies[type_]
KeyError: 'celery_task.add'

客户端报错日志:

c89d27d5-fa1f-4ae2-98c3-7eecc6cc01f6
Traceback (most recent call last):File "/Users/fangyirui/PycharmProjects/pythonProject/celery/celery_add_task.py", line 8, in <module>print(result.get())File "/Users/fangyirui/Library/Python/3.9/lib/python/site-packages/celery/result.py", line 251, in getreturn self.backend.wait_for_pending(File "/Users/fangyirui/Library/Python/3.9/lib/python/site-packages/celery/backends/asynchronous.py", line 223, in wait_for_pendingreturn result.maybe_throw(callback=callback, propagate=propagate)File "/Users/fangyirui/Library/Python/3.9/lib/python/site-packages/celery/result.py", line 365, in maybe_throwself.throw(value, self._to_remote_traceback(tb))File "/Users/fangyirui/Library/Python/3.9/lib/python/site-packages/celery/result.py", line 358, in throwself.on_ready.throw(*args, **kwargs)File "/Users/fangyirui/Library/Python/3.9/lib/python/site-packages/vine/promises.py", line 235, in throwreraise(type(exc), exc, tb)File "/Users/fangyirui/Library/Python/3.9/lib/python/site-packages/vine/utils.py", line 27, in reraiseraise value
celery.exceptions.NotRegistered: 'celery_task.add'

工作流程

一个任务的生命周期

+-----------------+       +-----------------+       +-----------------+
|     Client      | ----> |   Message Broker| ----> |     Worker      |
| (Task Producer) |       |   (e.g., Redis) |       | (Task Consumer) |
+-----------------+       +-----------------+       +-----------------+^                                                ||                                                v
+-----------------+                               +-----------------+
|  Result Backend | <-----------------------------|  Task Execution |
| (e.g., Redis)   |                               +-----------------+
+-----------------+
  1. 定义阶段:使用 @app.task 装饰器定义了一个简单函数 add
  2. 创建与发送:调用 add.delay(4, 6) 将请求转换成一条包含操作数和操作类型的信息,并放入Redis队列中。
  3. 排队阶段:Redis接收到这条信息,将其存储起来等待worker拉取。
  4. 获取并锁定:运行中的worker从Redis中拉取这条信息,并锁定它以防止其他workers重复执行同一项工作。
  5. 执行阶段:Worker根据信息内容计算,在此期间,这个信息处于处理中状态。
  6. 存储阶段:计算完毕后,将结果存储回Redis,以便以后查询。如果没有配置result backend,则跳过这一步骤直接进入下一个步骤。
  7. 查询阶段:客户端通过调用 result.get() 来阻塞式地等待并获取计算结果。在实际应用场景中,也可能是非阻塞式地检查状态,例如使用 result.status 或者轮询机制查看是否完成。
  8. 清理和过期管理: 根据系统设置,如果不再需要保存这些历史记录,可以由系统自动或者手动清除这些数据。结果默认在redis中86400秒后过期(24H)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/19789.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[Linux系统编程]文件IO

一.系统调用 什么是系统调用? 只有系统调用(系统函数)才能进入内核空间&#xff0c;库函数也是调用系统函数&#xff0c;才得以访问底层。 系统调用由操作系统实现并提供给外部应用程序的编程接口。是应用程序同系统之间数据交互的桥梁。 换句话说&#xff0c;系统调用就是操…

【计算Nei遗传距离】

报错 Warning message: In adegenet::df2genind(t(x), sep sep, ...) : Markers with no scored alleles have been removed 原因&#xff1a; 直接用plink转换为VCF&#xff0c;丢失了等位基因分型&#xff08;REF ALT&#xff09; &#xff08;plink编码的规则&…

优选算法一:双指针算法与练习(移动0)

目录 双指针算法讲解 移动零 双指针算法讲解 常见的双指针有两种形式&#xff0c;一种是对撞指针&#xff0c;一种是快慢指针。 对撞指针&#xff1a;一般用于顺序结构中&#xff0c;也称左右指针。 对撞指针从两端向中间移动。一个指针从最左端开始&#xff0c;另一个从最…

【Linux】进程(2):进程状态

大家好&#xff0c;我是苏貝&#xff0c;本篇博客带大家了解Linux进程&#xff08;1&#xff09;&#xff0c;如果你觉得我写的还不错的话&#xff0c;可以给我一个赞&#x1f44d;吗&#xff0c;感谢❤️ 目录 &#xff08;A&#xff09;R/S/D/T/t状态1. R&#xff1a;程序运…

Day-04python模块

一、模块 1-1 Python 自带模块 Json模块 处理json数据 {"key":"value"} json不是字典 本质是一个有引号的字符串数据 json注意点 {} 中的数据是字符串引号必须是双引号 使用json模块可以实现将json转为字典&#xff0c;使用字典的方法操作数据 。 或者将…

社交媒体数据恢复:最右

第一步&#xff1a;确认数据丢失原因 请确定您是因为误删、设备损坏还是其他原因导致“最右”中的数据丢失。这将有助于您更好地了解需要采取的恢复措施。 第二步&#xff1a;尝试从备份中恢复数据 如果您有定期备份的习惯&#xff0c;请尝试从备份中恢复丢失的数据。备份文…

Springboot JVM监控 通过Promethus

Springboot内置了对Prometheus得支持&#xff0c;可以监测得点有&#xff1a; JVM各指标参数&#xff08;GC&#xff0c;堆&#xff0c;非堆等&#xff09;接口调用次数&#xff0c;延时系统内存&#xff0c;IO&#xff0c;CPU使用率 部署Prometheus和Grafana 准备一台2核4G…

STM32自己从零开始实操03:输出部分原理图

一、继电器电路 1.1指路 延续使用 JZC-33F-012-ZS3 继电器&#xff0c;设计出以小电流撬动大电流的继电器电路。 &#xff08;提示&#xff09;电路需要包含&#xff1a;三极管开关电路、续流二极管、滤波电容、指示灯、输出部分。 1.2数据手册重要信息提炼 联系排列&…

手写HTML字符串解析成对应的 AST语法树

先看效果 展示如下&#xff1a; HTML模版 转成ast语法树后 在学习之前&#xff0c;我们需要了解这么一个问题&#xff0c;为什么要将HTML字符串解析成对应的 AST语法树。 为什么&#xff1f; 语法分析&#xff1a;HTML字符串是一种标记语言&#xff0c;其中包含了大量的标签…

使用PNP管控制MCU是否需要复位

这两台用到一款芯片带电池&#xff0c;希望电池还有电芯片在工作的时候插入电源不要给芯片复位&#xff0c;当电池没电&#xff0c;芯片不在工作的时候&#xff0c;插入电源给芯片复位所以使用一个PNP三极管&#xff0c;通过芯片IO控制是否打开复位&#xff0c;当芯片正常工作的…

python移动文件

测试1(直接把B文件夹移动到了A里&#xff0c;成为了A的子文件夹) import os import shutil# 移动文件夹,B文件夹在当前目录没有了&#xff0c;跑到了A的子文件里 ## shutil.move(./example1/B/, ./example1/A/)测试2(B文件不动&#xff0c;将B文件里的所有的子文件夹移动到A内…

响应式UI组件DevExtreme中文教程 - 工具栏的自适应模式

DevExtreme拥有高性能的HTML5 / JavaScript小部件集合&#xff0c;使您可以利用现代Web开发堆栈&#xff08;包括React&#xff0c;Angular&#xff0c;ASP.NET Core&#xff0c;jQuery&#xff0c;Knockout等&#xff09;构建交互式的Web应用程序。从Angular和Reac&#xff0c…

Android高通 12/13 录屏流程代码位置

需求如下图 实现系统录屏功能 frameworks/base/packages/SystemUI/src/com/android/systemui/screenrecord 涉及代码 ScreenRecordDialog # startBtn RecordingService # startRecording# stopRecording ScreenMediaRecorder # start # end #save 1、点击开始录屏framewo…

金融科技发展报告:移动支付的市场格局与趋势

引言 随着科技的飞速发展,金融科技已成为推动全球经济发展的重要力量。移动支付作为金融科技的重要分支,其市场格局与趋势日益受到业界的关注。本文将深入剖析移动支付的市场格局,探讨其发展趋势,并辅以相关案例作为辅助。 一、移动支付市场格局 1、全球市场概览 移动支…

vscode怎么点击路径直接跳转对应文件

在vue项目中经常要引入工具类、组件、模版等&#xff0c;想要直接去看对应文件&#xff0c;只能自己找到对应路径再去打开。 我们可用在js项目中创建一个 jsconfig.json文件&#xff0c;TS项目可以创建tsconfig.json 文件代码 {"compilerOptions": {"baseUrl&…

52-QSplitter类QDockWidget类

一 QSplitter类 Qt提供QSplitter(QSplitter)类来进行分裂布局&#xff0c;QSplitter派生于QFrame。 #ifndef MAINWINDOW_H #define MAINWINDOW_H#include <QMainWindow>class MainWindow : public QMainWindow {Q_OBJECTpublic:MainWindow(QWidget *parent nullptr);~…

linux /www/server/cron内log文件占用空间过大,/www/server/cron是什么内容,/www/server/cron是否可以删除

linux服务器长期使用宝塔自带计划任务&#xff0c;计划任务执行记录占用服务器空间过大&#xff0c;导致服务器根目录爆满&#xff0c;需要长期排查并删除 /www/server/cron 占用空间过大问题处理 /www/server/cron是什么内容&#xff1f;/www/server/cron是否可以删除&#xf…

vue2 bug 小白求助!!!(未解决,大概是浏览器缓存的问题或者是路由的问题)

我的vue2项目出现了一个超级恶心的bug 具体流程&#xff1a; 页面a点击a标签->到页面b->页面b用户退出刷新页面->点击浏览器的返回按钮返回上一页 返回页面后页面没有刷新导致用户名还显示这 项目中没有用keep-alive缓存 也在设置了key 尝试了window.removeEventLi…

vue UI组件整理

Vue2Vue3Element - The worlds most popular Vue UI frameworkOverview 组件总览 | Element Plushttps://v2.iviewui.com/docs/guide/installhttps://www.iviewui.com/view-ui-plus/guide/introduce按钮 Button - Ant Design按钮 Button - Ant DesignVuetify — A Material Des…

考试题库:华为HCIA-Datacom易错题⑪(含答案解析)

华为认证HCIA-Datacom易错题举例和答案分析。 需要更多题库资料&#xff0c;可以在文末领取 1、运行STP协议的设备端口处于Forwarding状态&#xff0c;下列说法正确的有? A.该端口端口既转发用户流量也处理BPDU报文 B.该端口会根据收到的用户流量构建MAC地址表&#xf…