Celery(分布式任务队列)入门学习笔记

Celery 的简单介绍

用 Celery 官方的介绍:它是一个分布式任务队列; 简单,灵活,可靠的处理大量消息的分布式系统; 它专注于实时处理,并支持任务调度。

Celery 如果使用 RabbitMQ 作为消息系统的话,整个应用体系就是下面这张图

Celery 官方给出的 Hello World, 对于未接触它的人来说根本就不知道是什么

1

2

3

4

5

6

7

from celery import Celery

app = Celery('hello', broker='amqp://guest@localhost//')

@app.task

def hello():

    return 'hello world'

还是有必要按住上面那张图看 Celery 的组成部分

  1. Celery 自身实现的部分其实是 Producer 和 Consumer. Producer 创建任务,并发送消息到消息队列,我们称这个队列为 Broker。Consumer 从 Broker 中接收消息,完成计算任务,把结果存到 Backend
  2. Broker 就是那个消息队列,可选择的实现有 RabbitMQ, Redis, Amazon SQS
  3. 结果存储(Backend), 可选择 AMQP(像 RabbitMQ 就是它的一个实现), Redis, Memcached, Cassandra, Elasticsearch, MongoDB, CouchDB, DynamoDB, Amazon S3, File system 等等,看来它的定制性很强
  4. 消息和结果的存储还涉及到一个序列化的问题,可选择 pickle(Python 专用), json, yaml, msgpack. 消息可用 zlib, bzip2 进行压缩, 或加密存储
  5. Worker 的并发可采用 prefork(多进程), thread(多线程), Eventlet, gevent, solo(单线程)]

Celery 应用的基础选型

Celery 的 Broker 和 Backend 有非常多的选择组合,RabbitMQ 和 Redis 都是即可作为 Broker 又能用作 Backend。但 Celery 的推荐是用 RabbitMQ 作为 Broker, 小的结果这里选择用 Redis 作为 Backend, 所以这里的选型是

  1. Broker: RabbitMQ
  2. Backend: Redis
  3. 序列化:JSON  -- 方便在学习中查到消息中的数据
准备 Redis

安装 Python 包

在需要运行 Producer 和 Consumer(worker) 的机器上创建一个 Python 虚拟环境,然后安装下面的包

$ pip install celery redis

实践中只需要安装 celery redis 就能运行后面的例子,没有安装 librabbitmq, "celery[librabbitmq]" 也行,安装了这两个库能使用更高效的 librabbitmq C 库。如果安装了 librabbitmq 库,broker='amqp://...'  默认使用 librabbitmq, 找不到 librabbitmq 的话就用 broker='pyamqp://...'

$ pip install librabbitmq
$ pip install "celery[librabbitmq]"

注:中括号中的是安装 Celery 提供的 bundle, 它定义在 setup.py 的 setup 函数中的 extras_require。

Celery 应用实战

我们不用 Celery 的 Hello World 实例,那不能帮助我们理解背后发生了什么。创建一个 tasks.py 文件

1

2

3

4

5

6

7

8

9

10

11

from celery import Celery

app = Celery('celery-demo',

                broker='amqp://celery:your-password@192.168.86.181:5672/',

                backend='redis://192.168.86.181:6379')

@app.task

def add(x, y):

    return x + y

这里配置连接到 brocker 的 / vhost, 如果连接到别的 vhost, 如 celery 的话, url 写成 amqp://celery:your-passoword@192.168.86.181:5672/celery. backend 的 redis 如果要配置密码, 和 db 的话,写成 redis://:password@192.168.86.181:6379/2

暂且不在该脚本中直接执行 add.delay(15, 30), 而是放到 Python 控制台下方便测试

现在进到 Python 控制台

1

2

3

4

5

6

>>> from tasks import add

>>> task = add.delay(15, 30)

>>> task.id

'c3552fa2-502a-450b-933b-19a1da65ba33'

>>> task.status

'PENDING'

由于 Worker 还没有启动,所以得到一个 task_id, 状态是 PENDING。趁这时候看看 Celery 目前做了什么,来查看到 RabbitMQ

7

celery direct

Celery 在 RabbitMQ 中创建了的资源有

  1. 一个 Exchange: celery direct
  2. 两个 binding: 送到默认(空字符串)或 celery exchange 的, routing-key 为 celery 的消息会转发到队列 celery 中
  3. 一个队列 celery

查看队列 celery 中的消息

1

2

3

4

5

6

vagrant@celery:~$ rabbitmqadmin get queue=celery ackmode=ack_requeue_true

+-------------+----------+---------------+-------------------------------------------------------------------------------------+---------------+------------------+-------------+

| routing_key | exchange | message_count |                                       payload                                       | payload_bytes | payload_encoding | redelivered |

+-------------+----------+---------------+-------------------------------------------------------------------------------------+---------------+------------------+-------------+

| celery      |          | 0             | [[15, 30], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}] | 83            | string           | False       |

+-------------+----------+---------------+-------------------------------------------------------------------------------------+---------------+------------------+-------------+

ackmode=ack_requeue_true, 所以消息仍然在队列中, Redis 中什么也还没发生,接下来要

启动 Celery Worker

要用到 celery 命令,不过只要是 Python 的程序,命令行能做的事情总是能用 Python 代码来执行,用 celery --help 可看它的详细说明。

$ celery -A tasks worker -l INFO

tasks 是自己创建的模块文件 tasks.py

这时候显示出一条绿绿的芹菜出来了,所以得用屏幕截图来表现

取出消息并显示任务执行完成,这时候去看 RabbitMQ 的队列 celery 中的消息不见了,启动 Worker 后也会在 RabbitMQ 中创建 queue, 及对应的 binding, exchange。

再回到提交任务的 Python 控制台

1

2

3

4

>>> task.status

'SUCCESS'

>>> task.result

45

一个 Celery 全套服务圆满完成。结果存在了 Redis 中

192.168.86.181:6379> keys *
1) "celery-task-meta-c3552fa2-502a-450b-933b-19a1da65ba33"
192.168.86.181:6379> TTL celery-task-meta-c3552fa2-502a-450b-933b-19a1da65ba33
(integer) 85840
192.168.86.181:6379> get celery-task-meta-c3552fa2-502a-450b-933b-19a1da65ba33
"{\"status\": \"SUCCESS\", \"result\": 45, \"traceback\": null, \"children\": [], \"date_done\": \"2022-01-17T07:23:48.901999\", \"task_id\": \"c3552fa2-502a-450b-933b-19a1da65ba33\"}"

Redis 中的结果保存时长为 24 小时,失败的任务会记录下异常信息。

关于 Worker 的控制查看帮助 celery worker --help, 比如

  1. -c, --concurrency: 并发数,默认为系统中 CPU 的内核数
  2. -P, --pool [prefork|eventlet|gevent|solo|processes|threads]:  worker 池的实现方式
  3. --max-tasks-per-child INTEGER: worker 执行的最大任务数,达到最大数目后便重启当前 worker
  4. -Q, --queues: 指定处理任务的队列名称,逗号分隔

任务的状态变迁是:PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS

Celery 的配置

除了在声明 Celery 对象时可以指定 broker, backend 属性之外,我们可以用 py 配置文件的形式来配置更多的内容,配置文件 celeryconfig.py, 内容是 Configuration and defautls 中列出的项目

比如 celeryconfig.py

1

2

3

4

5

6

7

broker_url = 'amqp://celery:your-password@192.168.86.181:5672/'

result_backend = 'redis://192.168.86.181:6379'

task_serializer = 'json'

result_serializer = 'json'

accept_content = ['json']

timezone = 'America/Chicago'

enable_utc = True

新的格式是用小写的,旧格式用大写,如 BROKER_URL, 但是同一个配置文件中不能混合大小写,同时写 BROKER_URL 和 result_backend 就不行了。

然后在 tasks.py 中加载配置文件

1

2

3

4

from celery import Celery

import celeryconfig

app = Celery('celery-demo')

app.config_from_object(celeryconfig)

Celery 实时监控工具

Flower 是一个基于 Web 的监控 Celery 中任务的工具,安装和启动

$ pip install flower
$ celery -A tasks flower

打开链接 http://localhost:5555

其他剩下的问题,应该就是如何安排 Worker(比如结合 AutoScaling),从 Python 代码中启动 Worker, 怎么做灵活的配置, 调度任务的执行,其他的 backend 选择等等。

其他补充

backend rpc:// 的组合

如果配置中用

1

2

broker_url = 'amqp://celery:password@192.168.86.50:5672/celery'

result_backend = 'rpc://'

amqp 和 rpc:// 的组合,任务和结果都会存在 RabbitMQ 中

1

2

broker_url = 'redis://192.168.86.50'

result_backend = 'rpc://'

redis 和  rpc:// 的组合,任务和结果都保存在 Redis 中

为什么 Celery 推荐使用 RabbitMQ, 一说是它的一开发人员负责开发过 RabbitMQ, 所以即使使用 Redis 时,也会在 Redis 中写入有关 RabbitMQ 概念的数据,如 exchange, routing key 等。

 常见问题

Celery ValueError: not enough values to unpack (expected 3, got 0)的解决方案

先安装eventlet

pip install eventlet

然后,启动worker的时候加一个参数,如下:

celery -A <moduleName> worker -l info -P eventlet

然后就可以正常运行worker执行任务了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/7255.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

25.哀家要长脑子了---哈希表

1.525. 连续数组 - 力扣&#xff08;LeetCode&#xff09; 在我对通义千问的一番折磨下&#xff0c;终于弄清楚一点点了。哈希表存储前缀和数组值 用一个counter来记录nums中0、1数量差值的变化。 哈希表map存储某个特定的counter值首次出现的位置。counter的计算&#xff1a;…

【深度学习】序列模型

深度学习&#xff08;Deep Learning&#xff09;是机器学习的一个分支领域&#xff1a;它是从数据中学习表示的一种新方法&#xff0c;强调从连续的层中进行学习&#xff0c;这些层对应于越来越有意义的表示。 1. 为什么选择序列模型&#xff1f; 循环神经网络&#xff08;RNN…

(二)JSP教程——taglib指令

创建标签文件 首先创建一个Web项目&#xff0c;在webapp/WEB-INF目录下创建一个tags文件夹 在tags文件夹中创建一个oddNumberSum.tag文件&#xff0c;Tag文件时扩展名为.tag的文本文件&#xff0c;其结构和JSP文件非常相似&#xff0c;该文件的目录结构如图所示 创建Tag文件的…

Linux基础之makefile/make

目录 一、背景 二、makefile和make的讲解 2.1 使用方法 2.2 伪目标文件 2.3 文件的属性以及属性的更新 2.4 makefile的自动推导 一、背景 这里会提及为什么要使用makefile和make&#xff0c;以及他们是什么和作用。 会不会写makefile&#xff0c;从一个侧面说明了一个人是…

怎么口语外教一对一课程?这篇文章告诉你答案!

怎么口语外教一对一课程&#xff1f;在当今全球化的时代&#xff0c;英语口语能力已经成为许多人追求的重要技能。为了满足这一需求&#xff0c;市场上涌现出了许多提供一对一口语外教课程的软件。这些软件不仅提供了与母语为英语的外教进行实时交流的机会&#xff0c;还通过互…

遭遇.halo勒索病毒怎么办?如何识别和应对.halo勒索病毒

导言&#xff1a; 近年来&#xff0c;网络安全问题愈发严峻&#xff0c;其中勒索病毒成为了威胁企业和个人数据安全的重要隐患。在2023年初&#xff0c;一种新的勒索病毒——.halo勒索病毒开始在网络上肆虐&#xff0c;给广大用户带来了极大的困扰。本文91数据恢复将对.halo勒…

c3 笔记6 认识css样式表

<link>与import应该如何选择?事实上&#xff0c;使用link与import链接外部样式文件的效果看起来是一样的&#xff0c;区别在于<link>是HTML标记而import属于CSS语法。<link>标记有rel、type与href属性&#xff0c;可以指定CSS样式表的名称&#xff0c;这样就…

控制台调试 hover 后才出现的元素

调试 hover后才出现的元素 打开开发者工具&#xff0c;鼠标放在hover时才出现的元素上&#xff0c;然后点击右键&#xff1b; 不要选中任何选项&#xff0c;将鼠标移动到开发者工具的调试面板中&#xff1b; 按下N键&#xff0c;此时悬浮的元素不会消失&#xff0c;定位成功。…

知到java笔记(4.1--继承的用法以及this和super的用法)

格式&#xff1a; 例子&#xff1a; get set获取父类的私有变量 private属性 this和super区别&#xff1a; this用法 super用法 例子

计算机毕设

随着社会和国家的重视&#xff0c;大学对于大学生毕业设计越来越重视。 做软件设计设计方面&#xff0c;前后端分离是必不可少的&#xff0c;代码管理工具&#xff0c;前后端接口测试是项目中必须要用到的工具。做大数据设计方面&#xff0c;主要是要用到爬虫进行数据爬取&…

【数据分析面试】36.SAAS公司邮件营销策略分析(业务分析)

题目 假设你在一个B2B SAAS公司营销团队工作。本季度即将结束&#xff0c;但当前收入距离预期目标还差10%。营销团队的一个高管要求负责电子邮件营销的人向所有客户群发邮件&#xff0c;要求他们购买更多产品。这是个好主意吗&#xff1f;为什么&#xff1f; 提示1&#xff1…

[论文笔记]Longformer: The Long-Document Transformer

引言 今天带来论文Longformer: The Long-Document Transformer的笔记。 基于Transformer的模型由于其自注意力操作而无法处理长序列&#xff0c;该操作随着序列长度呈二次扩展。为了解决这一限制&#xff0c;本篇工作提出了Longformer&#xff0c;其注意力机制随着序列长度呈…

接口测试之Mock测试方法

&#x1f345; 视频学习&#xff1a;文末有免费的配套视频可观看 &#x1f345; 点击文末小卡片&#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 一、关于Mock测试 1、什么是Mock测试&#xff1f; Mock 测试就是在测试过程中&#xff0c;对于…

跟TED演讲学英文:How to govern AI — even if it‘s hard to predict by Helen Toner

How to govern AI — even if it’s hard to predict Link: https://www.ted.com/talks/helen_toner_how_to_govern_ai_even_if_it_s_hard_to_predict? Speaker: Helen Toner Date: April 2024 文章目录 How to govern AI — even if its hard to predictIntroductionVocabu…

美业SaaS系统多门店收银系统源码-【分润常见问题】讲解(一)

美业管理系统源码 博弈美业SaaS系统 连锁多门店美业收银系统源码 多门店管理 / 会员管理 / 预约管理 / 排班管理 / 商品管理 / 促销活动 PC管理后台、手机APP、iPad APP、微信小程序 ▶ 分润常见问题&#xff1a; 1、分润金额基数 所有分润计算的基数均为平台订单中各个商…

解决 java: 非法字符: ‘\ufeff‘

【报错解释】&#xff1a; 该错误通常发生在尝试编译Java源代码文件时&#xff0c;文件开头的字符是一个字节顺序标记&#xff08;Byte Order Mark&#xff0c;BOM&#xff09;&#xff0c;即\ufeff。在Java中&#xff0c;\ufeff不是一个合法的字符&#xff0c;因此编译器会报…

不考408的985,不想考408的有福了!吉林大学计算机考研考情分析

吉林大学&#xff08;Jilin University&#xff09;简称吉大&#xff0c;位于吉林长春&#xff0c;始建于1946年&#xff0c;是中华人民共和国教育部直属的综合性全国重点大学&#xff0c;国家“双一流”、“211工程”、“985工程”、“2011计划”重点建设的著名学府&#xff0…

open-webui+ollama本地部署Llama3

前言 Meta Llama 3 是由 Meta 公司发布的下一代大型语言模型&#xff0c;拥有 80 亿和 700 亿参数两种版本&#xff0c;号称是最强大的开源语言模型。它在多个基准测试中超越了谷歌的 Gemma 7B 和 Mistral 7B Instruct 模型。 安装 1.gpt4all https://github.com/nomic-ai/…

[Unity常见小问题]打包ios后无法修改模型透明度

问题 在Editor下可以使用如下代码去修改模型的材质的透明度&#xff0c;但是打包ios后无法对透明度进行修改且没有任何warning和error using System.Collections; using System.Collections.Generic; using UnityEngine;public class NewBehaviourScript : MonoBehaviour {[R…

Word页脚设置“第X页共X页”的方法【域实现】

Word页脚设置“第X页共X页”的方法【域实现】 在设置Word页码格式的要求中&#xff0c;有时需要设置为“第X页共X页”这种格式&#xff0c;使用Word中的域功能可实现&#xff0c;同时&#xff0c;在某些情况下&#xff0c;可能还需要减去封面的页码&#xff0c;接下来为具体步…