分布式任务队列系统 celery 原理及入门

基本

Celery 是一个简单、灵活且可靠的分布式任务队列系统,用于在后台执行异步任务处理大量消息。支持任务调度、任务分发和结果存储,并且可以与消息代理(如 RabbitMQ、Redis 等)一起工作,以实现任务的队列管理和执行。

关键特性和概念:

  1. 分布式任务队列:Celery 允许你将任务分发到多个工作节点上,这些节点可以并行处理任务,从而提高系统的吞吐量和性能。
  2. 异步执行:Celery 支持异步执行任务,即任务可以在后台运行,而不阻塞主程序的执行。
  3. 定时任务:Celery 提供了定时任务功能,可以按照预定的时间间隔或特定时间点执行任务。
  4. 持久化:Celery 支持将任务结果持久化到数据库中,以便后续查询和分析。
  5. 多种消息传递协议:Celery 支持多种消息传递协议,如 RabbitMQ、Redis 等。

核心模块:

  1. 任务(Task):任务是 Celery 的基本单位,代表需要异步执行的函数。任务通过 @app.task 装饰器定义。
  2. 消息代理(Broker):消息代理是一个中间件,用于在客户端和 Worker 之间传递任务消息。常用的消息代理包括 RabbitMQ、Redis、Amazon SQS 等。
  3. Worker:Worker 是实际执行任务的进程。它从消息代理中获取任务并执行,然后将结果返回给结果后端(如果配置了结果后端)。
  4. 客户端(Client):客户端是发送任务到消息代理的部分,通常是你的应用程序代码。它调用任务并将其发送到消息代理。
  5. 结果后端(Result Backend):结果后端用于存储任务的执行结果,便于后续查询。常用的结果后端包括 Redis、数据库(如 PostgreSQL、MySQL)、MongoDB 等。
  6. Beat Scheduler:这是一个定时调度器,用于定期发送周期性任务到消息代理。它可以按照预定的时间间隔或特定时间点调度任务。

组成架构

请添加图片描述

Celery 的架构可以简化为三大核心组件:消息中间件(Message Broker)、任务执行单元(Worker)和任务执行结果存储(Task Result Store)

1. 消息中间件(Message Broker)

功能

  • 作为客户端和 Worker 之间的通信桥梁。
  • 接收来自客户端的任务消息,并将其分发给可用的 Worker。

常用实现

  • RabbitMQ:高性能、可靠性强,支持复杂路由规则。
  • Redis:轻量级、速度快,适合小规模应用。
  • Amazon SQS:托管服务,无需自行维护服务器。

2. 任务执行单元(Worker)

功能

  • 从消息中间件获取待处理的任务。
  • 执行实际的业务逻辑,即运行被装饰为 Celery 任务的函数。
  • 将执行结果发送到结果存储系统。

启动方式:

使用命令启动 Worker,例如:

celery -A tasks worker --loglevel=info

3. 任务执行结果存储(Task Result Store)

功能

  • 存储每个已完成任务的结果,以便后续查询或处理。

常用实现:

  1. Redis
  2. 数据库系统 (如 PostgreSQL, MySQL)
  3. MongoDB

入门

安装celery

pip install celery redis
或
pip3 install celery redis

定义和装饰任务

在代码中定义 Celery 应用和需要异步执行的任务函数。例如创建celery_task.py

from celery import Celery# broker 为消息中间件配置,这里用的是redis
# backend 为任务执行结果存储,也用的是redis
app = Celery('celery_task', broker='redis://localhost:32769/0', backend='redis://localhost:32769/1')# 通过装饰器指定任务执行单元,即消息接受后的处理函数
@app.task
def add(x, y):return f'{x}{y} 的和为 {x + y}'# 启动celery,准备好接收消息,一旦接收到消息就执行任务,并存储结果
if __name__ == '__main__':app.worker_main(['worker', '--loglevel=info'])

启动 Worker

启动一个或多个 Worker 进程来处理任务。这些 Worker 会连接到指定的消息代理并等待新任务到达。

方式一:

# 启动celery,准备好接收消息,一旦接收到消息就执行任务,并存储结果
if __name__ == '__main__':app.worker_main(['worker', '--loglevel=info'])

方式二:

确保你已经安装了celery并且正确配置环境变量

celery -A tasks worker --loglevel=info

启动完成:

请添加图片描述

发送任务

客户端代码调用定义好的 Celery 任务,并将其发送到消息代理。例如创建celery_add_task

from celery_task import add# 发送任务
result = add.delay(4, 6)print(result)
# 获取并打印结果(这会阻塞直到返回结果)
print(result.get())

执行结果:

请添加图片描述

(可选)获取结果

当你调用一个 Celery 任务时,你可以立即获取一个AsyncResult 实例,该实例可以用来检索任务的结果。

from celery_task import add
from celery.result import AsyncResult
import celery_task"""
使用Task ID创建AsyncResult对象,用于检查状态与获取最终计算出的值。第一个参数是发送任务时返回的task id
result = add.delay(4, 6)
print(result) # 直接打印出来的就是task_id
"""
# 第一种方式创建
# result = add.AsyncResult("16a6a1e4-9500-43b5-8a88-1058663d44b7")# 第二种方式创建
result = AsyncResult("16a6a1e4-9500-43b5-8a88-1058663d44b7", app=celery_task.app)status = result.status
if status == "SUCCESS":print('执行成功')
elif status == "FAILURE":print('执行失败')
elif status == "PENDING":print('任务等待中被执行')
elif status == "RETRY":print('任务异常后正在重试')
elif status == "STARTED":print('任务已经开始被执行')
else:print("未匹配到状态值")if result.ready():print(result.result)  # 打印最终计算出的值,如果已完成。
else:print("Task is still running")

注意

celery_task.py文件名和

代码app = Celery('celery_task', broker='redis://localhost:32769/0', backend='redis://localhost:32769/1')中的第一个参数名要一致,否则会报错

app端报错日志:

[2024-05-31 16:13:26,538: ERROR/MainProcess] Received unregistered task of type 'celery_task.add'.
The message has been ignored and discarded.Did you remember to import the module containing this task?
Or maybe you're using relative imports?Please see
https://docs.celeryq.dev/en/latest/internals/protocol.html
for more information.The full contents of the message body was:
b'[[4, 6], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (81b)The full contents of the message headers:
{'lang': 'py', 'task': 'celery_task.add', 'id': 'c89d27d5-fa1f-4ae2-98c3-7eecc6cc01f6', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'c89d27d5-fa1f-4ae2-98c3-7eecc6cc01f6', 'parent_id': None, 'argsrepr': '(4, 6)', 'kwargsrepr': '{}', 'origin': 'gen13554@fangyirui.local', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}}The delivery info for this task is:
{'exchange': '', 'routing_key': 'celery'}
Traceback (most recent call last):File "/Users/fangyirui/Library/Python/3.9/lib/python/site-packages/celery/worker/consumer/consumer.py", line 659, in on_task_receivedstrategy = strategies[type_]
KeyError: 'celery_task.add'

客户端报错日志:

c89d27d5-fa1f-4ae2-98c3-7eecc6cc01f6
Traceback (most recent call last):File "/Users/fangyirui/PycharmProjects/pythonProject/celery/celery_add_task.py", line 8, in <module>print(result.get())File "/Users/fangyirui/Library/Python/3.9/lib/python/site-packages/celery/result.py", line 251, in getreturn self.backend.wait_for_pending(File "/Users/fangyirui/Library/Python/3.9/lib/python/site-packages/celery/backends/asynchronous.py", line 223, in wait_for_pendingreturn result.maybe_throw(callback=callback, propagate=propagate)File "/Users/fangyirui/Library/Python/3.9/lib/python/site-packages/celery/result.py", line 365, in maybe_throwself.throw(value, self._to_remote_traceback(tb))File "/Users/fangyirui/Library/Python/3.9/lib/python/site-packages/celery/result.py", line 358, in throwself.on_ready.throw(*args, **kwargs)File "/Users/fangyirui/Library/Python/3.9/lib/python/site-packages/vine/promises.py", line 235, in throwreraise(type(exc), exc, tb)File "/Users/fangyirui/Library/Python/3.9/lib/python/site-packages/vine/utils.py", line 27, in reraiseraise value
celery.exceptions.NotRegistered: 'celery_task.add'

工作流程

一个任务的生命周期

+-----------------+       +-----------------+       +-----------------+
|     Client      | ----> |   Message Broker| ----> |     Worker      |
| (Task Producer) |       |   (e.g., Redis) |       | (Task Consumer) |
+-----------------+       +-----------------+       +-----------------+^                                                ||                                                v
+-----------------+                               +-----------------+
|  Result Backend | <-----------------------------|  Task Execution |
| (e.g., Redis)   |                               +-----------------+
+-----------------+
  1. 定义阶段:使用 @app.task 装饰器定义了一个简单函数 add
  2. 创建与发送:调用 add.delay(4, 6) 将请求转换成一条包含操作数和操作类型的信息,并放入Redis队列中。
  3. 排队阶段:Redis接收到这条信息,将其存储起来等待worker拉取。
  4. 获取并锁定:运行中的worker从Redis中拉取这条信息,并锁定它以防止其他workers重复执行同一项工作。
  5. 执行阶段:Worker根据信息内容计算,在此期间,这个信息处于处理中状态。
  6. 存储阶段:计算完毕后,将结果存储回Redis,以便以后查询。如果没有配置result backend,则跳过这一步骤直接进入下一个步骤。
  7. 查询阶段:客户端通过调用 result.get() 来阻塞式地等待并获取计算结果。在实际应用场景中,也可能是非阻塞式地检查状态,例如使用 result.status 或者轮询机制查看是否完成。
  8. 清理和过期管理: 根据系统设置,如果不再需要保存这些历史记录,可以由系统自动或者手动清除这些数据。结果默认在redis中86400秒后过期(24H)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/19789.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[Linux系统编程]文件IO

一.系统调用 什么是系统调用? 只有系统调用(系统函数)才能进入内核空间&#xff0c;库函数也是调用系统函数&#xff0c;才得以访问底层。 系统调用由操作系统实现并提供给外部应用程序的编程接口。是应用程序同系统之间数据交互的桥梁。 换句话说&#xff0c;系统调用就是操…

解决迁移到AWS的关键挑战

迁移到AWS云平台是许多出海企业的重要战略之一&#xff0c;但迁移过程中常常面临各种挑战。作为AWS官方合作伙伴&#xff0c;九河云深知客户在迁移过程中所面临的困难&#xff0c;并通过提供全面的支持和解决方案&#xff0c;帮助客户克服各种挑战&#xff0c;实现顺利迁移到AW…

graph Conv介绍

2. Graph Conv 的作用 The multiplication of the adjacency matrix A \textbf{A} A with the feature matrix X \textbf{X} X in the GraphConv layer is a crucial operation in Graph Convolutional Networks (GCNs). This operation performs a localized, weighted agg…

node依赖安装的bug汇总

1.npm仓库 首先要获取npm仓库的地址&#xff1a; registryhttp://11.11.111.1:1111/abcdefg/adsfadsf 类似这种的地址 然后设置npm仓库&#xff1a; npm config set registryhttp://11.11.111.1:1111/abcdefg/adsfadsf (地址要带等号) 接着安装依赖&#xff1a; npm i…

Golang中的 defer 关键字和Python中的上下文管理with关键字

defer&#xff0c;中文意思是&#xff1a;推迟 常用用于关闭文件操作&#xff0c;简而言之&#xff0c;就是try/finally的一种替代方案 使用示例 package mainimport "fmt"func main() {defer fmt.Println("执行延迟的函数")fmt.Println("执行外层…

【计算Nei遗传距离】

报错 Warning message: In adegenet::df2genind(t(x), sep sep, ...) : Markers with no scored alleles have been removed 原因&#xff1a; 直接用plink转换为VCF&#xff0c;丢失了等位基因分型&#xff08;REF ALT&#xff09; &#xff08;plink编码的规则&…

Centos7对比Ubuntu一些常用操作差异点

Centos7对比Ubuntu一些常用操作差异点 CentOS 7将于2024年6月30日停止维护&#xff0c;CentOS8已经转为Rhel的上游项目。同时Centos7的软件仓库中&#xff0c;部分软件版本较老。后续使用过程中可以考虑切换到Ubuntu。 下面总结了一些两个系统的常见差异点&#xff0c;包括软…

优选算法一:双指针算法与练习(移动0)

目录 双指针算法讲解 移动零 双指针算法讲解 常见的双指针有两种形式&#xff0c;一种是对撞指针&#xff0c;一种是快慢指针。 对撞指针&#xff1a;一般用于顺序结构中&#xff0c;也称左右指针。 对撞指针从两端向中间移动。一个指针从最左端开始&#xff0c;另一个从最…

【Linux】进程(2):进程状态

大家好&#xff0c;我是苏貝&#xff0c;本篇博客带大家了解Linux进程&#xff08;1&#xff09;&#xff0c;如果你觉得我写的还不错的话&#xff0c;可以给我一个赞&#x1f44d;吗&#xff0c;感谢❤️ 目录 &#xff08;A&#xff09;R/S/D/T/t状态1. R&#xff1a;程序运…

在Spring Boot中集成H2数据库:完整指南

引言 Spring Boot是一个简化企业级Java应用程序开发的强大框架。H2数据库是一个轻量级的、开源的SQL数据库&#xff0c;非常适合用于开发和测试。本文将指导您如何在Spring Boot应用程序中集成H2数据库&#xff0c;并探索一些高级配置选项。 依赖关系 首先&#xff0c;我们需…

windows打开工程文件是顺序读写吗

在 Windows 操作系统中&#xff0c;打开和读写工程文件的过程可以是顺序读写&#xff0c;也可以是随机读写&#xff0c;具体取决于使用的软件和文件的性质。以下是一些详细解释&#xff1a; 顺序读写 顺序读写&#xff08;sequential access&#xff09;是指按文件中数据的顺…

C/C++覆盖率收集

linux下C/C++代码覆盖度检查工具:BullseyeCoverage 主要作用: a.识别在测试过程中没有完全执行的代码; b.获取测试完整性相关的一些度量,来帮助判断是否已经充分测试。 BullseyeCoverage 使用步骤一般是: 1)安装BullseyeCoverage

ThreadLocal详解,与 HashMap 对比

ThreadLocal原理&#xff0c;使用注意事项&#xff0c;解决哈希冲突方式->和HashMap对比 ThreadLocal 原理&#xff1a; ThreadLocal 是 Java 中的一个线程级别的变量&#xff0c;它允许您在不同线程之间存储和访问相同变量的不同副本&#xff0c;每个线程都拥有自己的副本&…

单片机的自动化编程语言:深度探索与未来展望

单片机的自动化编程语言&#xff1a;深度探索与未来展望 单片机作为现代电子设备的核心控制单元&#xff0c;其自动化编程语言的发展与应用&#xff0c;对提升设备性能、简化编程流程具有重大意义。本文将从四个方面、五个方面、六个方面和七个方面&#xff0c;对单片机的自动…

Day-04python模块

一、模块 1-1 Python 自带模块 Json模块 处理json数据 {"key":"value"} json不是字典 本质是一个有引号的字符串数据 json注意点 {} 中的数据是字符串引号必须是双引号 使用json模块可以实现将json转为字典&#xff0c;使用字典的方法操作数据 。 或者将…

社交媒体数据恢复:最右

第一步&#xff1a;确认数据丢失原因 请确定您是因为误删、设备损坏还是其他原因导致“最右”中的数据丢失。这将有助于您更好地了解需要采取的恢复措施。 第二步&#xff1a;尝试从备份中恢复数据 如果您有定期备份的习惯&#xff0c;请尝试从备份中恢复丢失的数据。备份文…

Springboot JVM监控 通过Promethus

Springboot内置了对Prometheus得支持&#xff0c;可以监测得点有&#xff1a; JVM各指标参数&#xff08;GC&#xff0c;堆&#xff0c;非堆等&#xff09;接口调用次数&#xff0c;延时系统内存&#xff0c;IO&#xff0c;CPU使用率 部署Prometheus和Grafana 准备一台2核4G…

局域网测速工具详解

使用好局域网测速工具可以帮助我们&#xff1a; 1. 网络性能评估 - 确定带宽&#xff1a;测量网络的实际上传和下载速度&#xff0c;以确定是否满足业务需求。 - 检测延迟和抖动&#xff1a;评估网络传输中的延迟和抖动&#xff0c;确保实时应用&#xff08;如VoIP、视频会议&a…

问题排查|记录一次基于mymuduo库开发的服务器错误排查(段错误--Segmentation fault (core dumped))

问题记录&#xff1a; 在刚完成mymuduo库之后&#xff0c;写了一个简单的测试服务器&#xff0c; 但是在服务器运行后直接报错&#xff1a; cherryhcss-ecs-4995:~/mymuduo/example$ ./testserver Segmentation fault (core dumped)出现多错误这通常意味着程序试图访问其内存空…

解决Mybatisplus中没有Db类静态工具的方案--提高版本

方案&#xff1a;将两个的版本都提高即可解决 Mybatis—plus的依赖文件 <dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.3</version></dependency><!-- ge…