Celery(分布式任务队列)入门学习笔记

Celery 的简单介绍

用 Celery 官方的介绍:它是一个分布式任务队列; 简单,灵活,可靠的处理大量消息的分布式系统; 它专注于实时处理,并支持任务调度。

Celery 如果使用 RabbitMQ 作为消息系统的话,整个应用体系就是下面这张图

Celery 官方给出的 Hello World, 对于未接触它的人来说根本就不知道是什么

1

2

3

4

5

6

7

from celery import Celery

app = Celery('hello', broker='amqp://guest@localhost//')

@app.task

def hello():

    return 'hello world'

还是有必要按住上面那张图看 Celery 的组成部分

  1. Celery 自身实现的部分其实是 Producer 和 Consumer. Producer 创建任务,并发送消息到消息队列,我们称这个队列为 Broker。Consumer 从 Broker 中接收消息,完成计算任务,把结果存到 Backend
  2. Broker 就是那个消息队列,可选择的实现有 RabbitMQ, Redis, Amazon SQS
  3. 结果存储(Backend), 可选择 AMQP(像 RabbitMQ 就是它的一个实现), Redis, Memcached, Cassandra, Elasticsearch, MongoDB, CouchDB, DynamoDB, Amazon S3, File system 等等,看来它的定制性很强
  4. 消息和结果的存储还涉及到一个序列化的问题,可选择 pickle(Python 专用), json, yaml, msgpack. 消息可用 zlib, bzip2 进行压缩, 或加密存储
  5. Worker 的并发可采用 prefork(多进程), thread(多线程), Eventlet, gevent, solo(单线程)]

Celery 应用的基础选型

Celery 的 Broker 和 Backend 有非常多的选择组合,RabbitMQ 和 Redis 都是即可作为 Broker 又能用作 Backend。但 Celery 的推荐是用 RabbitMQ 作为 Broker, 小的结果这里选择用 Redis 作为 Backend, 所以这里的选型是

  1. Broker: RabbitMQ
  2. Backend: Redis
  3. 序列化:JSON  -- 方便在学习中查到消息中的数据
准备 Redis

安装 Python 包

在需要运行 Producer 和 Consumer(worker) 的机器上创建一个 Python 虚拟环境,然后安装下面的包

$ pip install celery redis

实践中只需要安装 celery redis 就能运行后面的例子,没有安装 librabbitmq, "celery[librabbitmq]" 也行,安装了这两个库能使用更高效的 librabbitmq C 库。如果安装了 librabbitmq 库,broker='amqp://...'  默认使用 librabbitmq, 找不到 librabbitmq 的话就用 broker='pyamqp://...'

$ pip install librabbitmq
$ pip install "celery[librabbitmq]"

注:中括号中的是安装 Celery 提供的 bundle, 它定义在 setup.py 的 setup 函数中的 extras_require。

Celery 应用实战

我们不用 Celery 的 Hello World 实例,那不能帮助我们理解背后发生了什么。创建一个 tasks.py 文件

1

2

3

4

5

6

7

8

9

10

11

from celery import Celery

app = Celery('celery-demo',

                broker='amqp://celery:your-password@192.168.86.181:5672/',

                backend='redis://192.168.86.181:6379')

@app.task

def add(x, y):

    return x + y

这里配置连接到 brocker 的 / vhost, 如果连接到别的 vhost, 如 celery 的话, url 写成 amqp://celery:your-passoword@192.168.86.181:5672/celery. backend 的 redis 如果要配置密码, 和 db 的话,写成 redis://:password@192.168.86.181:6379/2

暂且不在该脚本中直接执行 add.delay(15, 30), 而是放到 Python 控制台下方便测试

现在进到 Python 控制台

1

2

3

4

5

6

>>> from tasks import add

>>> task = add.delay(15, 30)

>>> task.id

'c3552fa2-502a-450b-933b-19a1da65ba33'

>>> task.status

'PENDING'

由于 Worker 还没有启动,所以得到一个 task_id, 状态是 PENDING。趁这时候看看 Celery 目前做了什么,来查看到 RabbitMQ

7

celery direct

Celery 在 RabbitMQ 中创建了的资源有

  1. 一个 Exchange: celery direct
  2. 两个 binding: 送到默认(空字符串)或 celery exchange 的, routing-key 为 celery 的消息会转发到队列 celery 中
  3. 一个队列 celery

查看队列 celery 中的消息

1

2

3

4

5

6

vagrant@celery:~$ rabbitmqadmin get queue=celery ackmode=ack_requeue_true

+-------------+----------+---------------+-------------------------------------------------------------------------------------+---------------+------------------+-------------+

| routing_key | exchange | message_count |                                       payload                                       | payload_bytes | payload_encoding | redelivered |

+-------------+----------+---------------+-------------------------------------------------------------------------------------+---------------+------------------+-------------+

| celery      |          | 0             | [[15, 30], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}] | 83            | string           | False       |

+-------------+----------+---------------+-------------------------------------------------------------------------------------+---------------+------------------+-------------+

ackmode=ack_requeue_true, 所以消息仍然在队列中, Redis 中什么也还没发生,接下来要

启动 Celery Worker

要用到 celery 命令,不过只要是 Python 的程序,命令行能做的事情总是能用 Python 代码来执行,用 celery --help 可看它的详细说明。

$ celery -A tasks worker -l INFO

tasks 是自己创建的模块文件 tasks.py

这时候显示出一条绿绿的芹菜出来了,所以得用屏幕截图来表现

取出消息并显示任务执行完成,这时候去看 RabbitMQ 的队列 celery 中的消息不见了,启动 Worker 后也会在 RabbitMQ 中创建 queue, 及对应的 binding, exchange。

再回到提交任务的 Python 控制台

1

2

3

4

>>> task.status

'SUCCESS'

>>> task.result

45

一个 Celery 全套服务圆满完成。结果存在了 Redis 中

192.168.86.181:6379> keys *
1) "celery-task-meta-c3552fa2-502a-450b-933b-19a1da65ba33"
192.168.86.181:6379> TTL celery-task-meta-c3552fa2-502a-450b-933b-19a1da65ba33
(integer) 85840
192.168.86.181:6379> get celery-task-meta-c3552fa2-502a-450b-933b-19a1da65ba33
"{\"status\": \"SUCCESS\", \"result\": 45, \"traceback\": null, \"children\": [], \"date_done\": \"2022-01-17T07:23:48.901999\", \"task_id\": \"c3552fa2-502a-450b-933b-19a1da65ba33\"}"

Redis 中的结果保存时长为 24 小时,失败的任务会记录下异常信息。

关于 Worker 的控制查看帮助 celery worker --help, 比如

  1. -c, --concurrency: 并发数,默认为系统中 CPU 的内核数
  2. -P, --pool [prefork|eventlet|gevent|solo|processes|threads]:  worker 池的实现方式
  3. --max-tasks-per-child INTEGER: worker 执行的最大任务数,达到最大数目后便重启当前 worker
  4. -Q, --queues: 指定处理任务的队列名称,逗号分隔

任务的状态变迁是:PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS

Celery 的配置

除了在声明 Celery 对象时可以指定 broker, backend 属性之外,我们可以用 py 配置文件的形式来配置更多的内容,配置文件 celeryconfig.py, 内容是 Configuration and defautls 中列出的项目

比如 celeryconfig.py

1

2

3

4

5

6

7

broker_url = 'amqp://celery:your-password@192.168.86.181:5672/'

result_backend = 'redis://192.168.86.181:6379'

task_serializer = 'json'

result_serializer = 'json'

accept_content = ['json']

timezone = 'America/Chicago'

enable_utc = True

新的格式是用小写的,旧格式用大写,如 BROKER_URL, 但是同一个配置文件中不能混合大小写,同时写 BROKER_URL 和 result_backend 就不行了。

然后在 tasks.py 中加载配置文件

1

2

3

4

from celery import Celery

import celeryconfig

app = Celery('celery-demo')

app.config_from_object(celeryconfig)

Celery 实时监控工具

Flower 是一个基于 Web 的监控 Celery 中任务的工具,安装和启动

$ pip install flower
$ celery -A tasks flower

打开链接 http://localhost:5555

其他剩下的问题,应该就是如何安排 Worker(比如结合 AutoScaling),从 Python 代码中启动 Worker, 怎么做灵活的配置, 调度任务的执行,其他的 backend 选择等等。

其他补充

backend rpc:// 的组合

如果配置中用

1

2

broker_url = 'amqp://celery:password@192.168.86.50:5672/celery'

result_backend = 'rpc://'

amqp 和 rpc:// 的组合,任务和结果都会存在 RabbitMQ 中

1

2

broker_url = 'redis://192.168.86.50'

result_backend = 'rpc://'

redis 和  rpc:// 的组合,任务和结果都保存在 Redis 中

为什么 Celery 推荐使用 RabbitMQ, 一说是它的一开发人员负责开发过 RabbitMQ, 所以即使使用 Redis 时,也会在 Redis 中写入有关 RabbitMQ 概念的数据,如 exchange, routing key 等。

 常见问题

Celery ValueError: not enough values to unpack (expected 3, got 0)的解决方案

先安装eventlet

pip install eventlet

然后,启动worker的时候加一个参数,如下:

celery -A <moduleName> worker -l info -P eventlet

然后就可以正常运行worker执行任务了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/7255.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

25.哀家要长脑子了---哈希表

1.525. 连续数组 - 力扣&#xff08;LeetCode&#xff09; 在我对通义千问的一番折磨下&#xff0c;终于弄清楚一点点了。哈希表存储前缀和数组值 用一个counter来记录nums中0、1数量差值的变化。 哈希表map存储某个特定的counter值首次出现的位置。counter的计算&#xff1a;…

【深度学习】序列模型

深度学习&#xff08;Deep Learning&#xff09;是机器学习的一个分支领域&#xff1a;它是从数据中学习表示的一种新方法&#xff0c;强调从连续的层中进行学习&#xff0c;这些层对应于越来越有意义的表示。 1. 为什么选择序列模型&#xff1f; 循环神经网络&#xff08;RNN…

python之 函数相关知识解析

01 函数的注释与嵌套 1.函数的注释 函数的注释与普通注释的区别&#xff1a;用来说明当前函数的参数含义 param 参数名: 参数的注释信息 return: 函数的返回值 例如&#xff1a; def fun1(name):""":param name: 参数的注释信息:return: 函数的返回值"…

(二)JSP教程——taglib指令

创建标签文件 首先创建一个Web项目&#xff0c;在webapp/WEB-INF目录下创建一个tags文件夹 在tags文件夹中创建一个oddNumberSum.tag文件&#xff0c;Tag文件时扩展名为.tag的文本文件&#xff0c;其结构和JSP文件非常相似&#xff0c;该文件的目录结构如图所示 创建Tag文件的…

Linux基础之makefile/make

目录 一、背景 二、makefile和make的讲解 2.1 使用方法 2.2 伪目标文件 2.3 文件的属性以及属性的更新 2.4 makefile的自动推导 一、背景 这里会提及为什么要使用makefile和make&#xff0c;以及他们是什么和作用。 会不会写makefile&#xff0c;从一个侧面说明了一个人是…

cache数据库在信创中的应用

Cache数据库在信创&#xff08;信息技术应用创新&#xff09;中的应用主要体现在其高性能、高可用性、灵活性和易用性等方面&#xff0c;这些特点使得Cache数据库在信创领域具有广泛的应用前景。 首先&#xff0c;Cache数据库作为后关系型数据库&#xff0c;能够整合对象数据库…

怎么口语外教一对一课程?这篇文章告诉你答案!

怎么口语外教一对一课程&#xff1f;在当今全球化的时代&#xff0c;英语口语能力已经成为许多人追求的重要技能。为了满足这一需求&#xff0c;市场上涌现出了许多提供一对一口语外教课程的软件。这些软件不仅提供了与母语为英语的外教进行实时交流的机会&#xff0c;还通过互…

Java之String类

一、String类常用方法 1.引用类型的比较 我们知道在Java中两个引用遍历是不能用" "号来比较的&#xff0c;而String类重写了父类objects的equals方法&#xff0c; 实现了引用类型的比较 例子 import java.util.Scanner; public class Main { public static void…

遭遇.halo勒索病毒怎么办?如何识别和应对.halo勒索病毒

导言&#xff1a; 近年来&#xff0c;网络安全问题愈发严峻&#xff0c;其中勒索病毒成为了威胁企业和个人数据安全的重要隐患。在2023年初&#xff0c;一种新的勒索病毒——.halo勒索病毒开始在网络上肆虐&#xff0c;给广大用户带来了极大的困扰。本文91数据恢复将对.halo勒…

c3 笔记6 认识css样式表

<link>与import应该如何选择?事实上&#xff0c;使用link与import链接外部样式文件的效果看起来是一样的&#xff0c;区别在于<link>是HTML标记而import属于CSS语法。<link>标记有rel、type与href属性&#xff0c;可以指定CSS样式表的名称&#xff0c;这样就…

C#中override与重载的区别

在C#中&#xff0c;override和重载&#xff08;通常通过定义具有相同名称但不同参数列表的方法来实现&#xff09;是两个不同的概念&#xff0c;它们在用途和行为上有所区别。下面是关于override和重载的主要区别&#xff1a; override&#xff08;重写&#xff09; 定义&…

控制台调试 hover 后才出现的元素

调试 hover后才出现的元素 打开开发者工具&#xff0c;鼠标放在hover时才出现的元素上&#xff0c;然后点击右键&#xff1b; 不要选中任何选项&#xff0c;将鼠标移动到开发者工具的调试面板中&#xff1b; 按下N键&#xff0c;此时悬浮的元素不会消失&#xff0c;定位成功。…

知到java笔记(4.1--继承的用法以及this和super的用法)

格式&#xff1a; 例子&#xff1a; get set获取父类的私有变量 private属性 this和super区别&#xff1a; this用法 super用法 例子

计算机毕设

随着社会和国家的重视&#xff0c;大学对于大学生毕业设计越来越重视。 做软件设计设计方面&#xff0c;前后端分离是必不可少的&#xff0c;代码管理工具&#xff0c;前后端接口测试是项目中必须要用到的工具。做大数据设计方面&#xff0c;主要是要用到爬虫进行数据爬取&…

yum下载安装包

一、背景 对于连接网络的centos系统&#xff0c;可以直接通过yum install安装应用。 对于不能连接外部网络的centos系统&#xff0c;显然yum无法使用&#xff0c;需要将安装包拷贝到机器上安装&#xff0c;但去哪里去寻找和下载安装包呢。 二、yum downloads yum工具贴心的…

【数据分析面试】36.SAAS公司邮件营销策略分析(业务分析)

题目 假设你在一个B2B SAAS公司营销团队工作。本季度即将结束&#xff0c;但当前收入距离预期目标还差10%。营销团队的一个高管要求负责电子邮件营销的人向所有客户群发邮件&#xff0c;要求他们购买更多产品。这是个好主意吗&#xff1f;为什么&#xff1f; 提示1&#xff1…

[论文笔记]Longformer: The Long-Document Transformer

引言 今天带来论文Longformer: The Long-Document Transformer的笔记。 基于Transformer的模型由于其自注意力操作而无法处理长序列&#xff0c;该操作随着序列长度呈二次扩展。为了解决这一限制&#xff0c;本篇工作提出了Longformer&#xff0c;其注意力机制随着序列长度呈…

花一个月时间为 vue3 重制了 vue-styled-components

花一个月时间为 vue3 重制了 vue-styled-components 前言 styled-components 在 React 是一个超级热门的 css in js 工具库。其实 styled-components 也有 Vue 版本&#xff08;vue-styled-components&#xff09;&#xff0c;可惜的是只支持 Vue2&#xff0c;并且该项目已有…

接口测试之Mock测试方法

&#x1f345; 视频学习&#xff1a;文末有免费的配套视频可观看 &#x1f345; 点击文末小卡片&#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 一、关于Mock测试 1、什么是Mock测试&#xff1f; Mock 测试就是在测试过程中&#xff0c;对于…

跟TED演讲学英文:How to govern AI — even if it‘s hard to predict by Helen Toner

How to govern AI — even if it’s hard to predict Link: https://www.ted.com/talks/helen_toner_how_to_govern_ai_even_if_it_s_hard_to_predict? Speaker: Helen Toner Date: April 2024 文章目录 How to govern AI — even if its hard to predictIntroductionVocabu…