如何在 Ubuntu VPS 上使用 Celery 与 RabbitMQ 来做队列

简介


异步或非阻塞处理是一种将某些任务的执行与程序的主要流程分离的方法。这为您提供了几个优势,包括允许用户界面代码在没有中断的情况下运行。

消息传递是程序组件用来通信和交换信息的一种方法。它可以同步或异步实现,并且可以允许离散进程进行无问题的通信。消息传递通常作为传统数据库的替代实现,因为消息队列通常实现了额外的功能,提供了增加的性能,并且可以完全驻留在内存中。

Celery 是建立在异步消息传递系统上的任务队列。它可以用作编程任务可以被倾倒的桶。传递任务的程序可以继续执行和响应功能,然后稍后它可以轮询 celery 来查看计算是否完成并检索数据。

虽然 celery 是用 Python 编写的,但它的协议可以在任何语言中实现。它甚至可以通过 webhooks 与其他语言一起使用。

通过在程序环境中实现作业队列,您可以轻松卸载任务并继续处理用户的交互。这是增加应用程序响应性的简单方法,并且在执行长时间运行的计算时不会被锁定。

在本指南中,我们将在 Ubuntu 12.04 VPS 上安装和实现使用 RabbitMQ 作为消息系统的 celery 作业队列。

安装组件


安装 Celery


Celery 是用 Python 编写的,因此可以像处理常规 Python 包一样轻松安装。

我们将按照处理 Python 包的推荐程序,通过创建虚拟环境来安装我们的消息系统。这有助于我们保持环境稳定,不会影响更大的系统。

从 Ubuntu 的默认存储库安装 Python 虚拟环境包:

sudo apt-get update
sudo apt-get install python-virtualenv

我们将创建一个消息目录,在这里我们将实现我们的系统:

mkdir ~/messaging
cd ~/messaging

现在我们可以创建一个虚拟环境,通过以下命令安装 celery:

virtualenv --no-site-packages venv

配置好虚拟环境后,可以通过输入以下命令激活它:

source venv/bin/activate

您的提示符将更改以反映您现在正在使用我们上面创建的虚拟环境。这将确保我们的 Python 包安装在本地而不是全局。

如果在任何时候我们需要停用环境(现在不需要),可以输入:

deactivate

现在我们已经激活了环境,可以使用 pip 安装 celery:

pip install celery

安装 RabbitMQ


Celery 需要一个消息代理来处理来自外部源的请求。这个代理被称为“broker”。

有很多可供选择的代理选项,包括关系型数据库、NoSQL 数据库、键值存储和实际消息系统。

我们将配置 celery 使用 RabbitMQ 消息系统,因为它提供了强大、稳定的性能,并且与 celery 交互良好。这是一个很好的解决方案,因为它包含了与我们预期使用的功能很好契合的特性。

我们可以通过 Ubuntu 的存储库安装 RabbitMQ:

sudo apt-get install rabbitmq-server

RabbitMQ 服务在安装后会自动启动在我们的服务器上。

创建 Celery 实例


为了使用 celery 的任务排队功能,安装后的第一步是创建一个 celery 实例。这是一个简单的过程,导入包,创建一个“app”,然后设置 celery 能够在后台执行的任务。

让我们在我们的消息目录内创建一个名为 tasks.py 的 Python 脚本,我们可以在其中定义我们的工作人员可以执行的任务。

sudo nano ~/messaging/tasks.py

我们应该做的第一件事是从 celery 包中导入 Celery 函数:

from celery import Celery

之后,我们可以创建一个连接到默认 RabbitMQ 服务的 celery 应用程序实例:

from celery import Celeryapp = Celery('tasks', backend='amqp', broker='amqp://')

Celery 函数的第一个参数是将用于标识任务的前缀名称。

backend 参数是一个可选参数,如果您希望查询后台任务的状态或检索其结果,则是必需的。

如果您的任务只是执行一些工作然后退出,而不返回在程序中使用的有用值,您可以将此参数省略。如果只有一些任务需要此功能,请在此启用它,我们可以在后面逐个案例地禁用它。

broker 参数指定连接到我们代理所需的 URL。在我们的情况下,这是运行在我们服务器上的 RabbitMQ 服务。RabbitMQ 使用一种称为“amqp”的协议运行。如果 RabbitMQ 在其默认配置下运行,celery 可以连接而无需其他信息,只需 amqp:// 方案。

构建 Celery 任务


在这个文件中,我们现在需要添加我们的任务。

每个 Celery 任务都必须使用装饰器 @app.task 来引入。这允许 Celery 识别可以添加其排队功能的函数。在每个装饰器之后,我们只需创建一个我们的工作进程可以运行的函数。

我们的第一个任务将是一个简单的函数,它将一个字符串打印到控制台。

from celery import Celeryapp = Celery('tasks', backend='amqp', broker='amqp://')@app.task
def print_hello():print 'hello there'

因为这个函数不返回任何有用的信息(它将其打印到控制台),我们可以告诉 Celery 不使用后端来存储关于此任务的状态信息。这在内部更简单,需要更少的资源。

from celery import Celeryapp = Celery('tasks', backend='amqp', broker='amqp://')@app.task(ignore_result=True)
def print_hello():print 'hello there'

接下来,我们将添加另一个函数,它将生成素数(取自 RosettaCode)。这可能是一个长时间运行的过程,因此这是一个很好的例子,说明我们在等待结果时如何处理异步工作进程。

from celery import Celeryapp = Celery('tasks', backend='amqp', broker='amqp://')@app.task(ignore_result=True)
def print_hello():print 'hello there'@app.task
def gen_prime(x):multiples = []results = []for i in xrange(2, x+1):if i not in multiples:results.append(i)for j in xrange(i*i, x+1, i):multiples.append(j)return results

因为我们关心这个函数的返回值,并且我们想知道它何时完成(以便我们可以使用结果等),所以我们不会向这个第二个任务添加 ignore_result 参数。

保存并关闭文件。

启动 Celery 工作进程


我们现在可以启动一个工作进程,它将能够接受来自应用程序的连接。它将使用我们刚刚创建的文件来了解它可以执行的任务。

启动一个工作实例就像使用 celery 命令调用应用程序名称一样简单。我们将在字符串末尾包含一个 “&” 字符,将我们的工作进程放入后台:

celery worker -A tasks &

这将启动一个应用程序,然后将其从终端分离,允许您继续使用它进行其他任务。

如果您想要启动多个工作进程,可以使用 -n 参数为每个工作进程命名:

celery worker -A tasks -n one.%h &
celery worker -A tasks -n two.%h &

当工作进程被命名时,%h 将被主机名替换。

要停止工作进程,您可以使用 kill 命令。我们可以查询进程 ID,然后根据这些信息消除工作进程。

ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill

这将允许工作进程在退出之前完成其当前任务。

如果您希望关闭所有工作进程而不等待它们完成任务,可以执行:

ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9

使用队列处理工作


我们可以使用我们生成的工作进程在后台为我们的程序完成工作。

我们将在 Python 解释器中探索不同的选项,而不是创建一个完整的程序来演示它是如何工作的:

python

在提示符下,我们可以将我们的函数导入到环境中:

from tasks import print_hello
from tasks import gen_prime

如果您测试这些函数,它们似乎没有任何特殊功能。第一个函数按预期打印一行:

print_hello()

hello there

第二个函数返回一个素数列表:

primes = gen_prime(1000)
print primes

如果我们给第二个函数一个更大的数字范围来检查,执行将挂起,因为它在计算中:

primes = gen_prime(50000)

通过输入 “CTRL-C” 来停止执行。这个过程显然没有在后台计算。

要访问后台工作进程,我们需要使用 .delay 方法。Celery 为我们的函数添加了额外的功能。这个方法用于将函数传递给工作进程执行。它应该立即返回:

primes = gen_prime.delay(50000)

这个任务现在正在由我们之前启动的工作进程执行。因为我们为应用程序配置了 backend 参数,我们可以检查计算的状态并访问结果。

要检查任务是否完成,我们可以使用 .ready 方法:

primes.ready()

False

“False” 的值意味着任务仍在运行,结果尚不可用。当我们得到 “True” 的值时,我们可以对答案做些什么。

primes.ready()

True

我们可以使用 .get 方法获取值。

如果我们已经使用 .ready 方法验证了值是否计算出来,那么我们可以像这样使用该方法:

print primes.get()

[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, 271, 277, 281, 283, 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, 379, 383, 389, 397, 401, 409, 419, 421, 431, 433, 439, 443, 449, 457, 461, 463, 467, 479, 487, 491, 499, 503, 509, 521, 523,
. . .

然而,如果您在调用 .get 之前没有使用 .ready 方法,您很可能希望添加一个 “timeout” 选项,以便您的程序不必等待结果,这将违反我们的实现目的:

print primes.get(timeout=2)

如果超时,这将引发异常,您可以在程序中处理它。

结论


虽然这些信息足以让你开始在程序中使用 celery,但这只是揭开了这个库的全部功能的一角。Celery允许你将后台任务串联在一起,对任务进行分组,并以有趣的方式组合函数。

虽然 celery 是用 Python 编写的,但可以通过 Webhooks 与其他语言一起使用。这使得它非常灵活,可以将任务移到后台,而不受所选择的语言的限制。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/683366.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2月14日作业

1、安装tftp服务器和nfs服务器,准备需要下载到开发板文件,存放在指定下载文件夹下,准备需要挂载到开发板文件夹,存放在指定挂载文件夹中。 2、ubuntu和开发板组网,关闭防火墙,关闭杀毒软件,配置…

探索XGBoost:自动化机器学习(AutoML)

探索XGBoost:自动化机器学习(AutoML) 导言 自动化机器学习(AutoML)是一种通过自动化流程来构建、训练和部署机器学习模型的方法。XGBoost作为一种强大的机器学习算法,也可以用于AutoML。本教程将介绍如何…

NLP快速入门

NLP入门 课程链接:https://www.bilibili.com/video/BV17K4y1W7yb/?p1&vd_source3f265bbf5a1f54aab2155d9cc1250219 参考文档链接1:NLP知识点:Tokenizer分词器 - 掘金 (juejin.cn) 一、分词 分词是什么? 每个字母都有对应…

【Web】从零开始的js逆向学习笔记(上)

目录 一、逆向基础 1.1 语法基础 1.2 作用域 1.3 窗口对象属性 1.4 事件 二、浏览器控制台 2.1 Network Network-Headers Network-Header-General Network-Header-Response Headers Network-Header-Request Headers 2.2 Sources 2.3 Application 2.4 Console 三、…

车载诊断协议DoIP系列 —— DoIP会话模式(安全与非安全)

车载诊断协议DoIP系列 —— DoIP会话模式(安全与非安全) 我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师(Wechat:gongkenan2013)。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 本就是小人物,输了就是输了,不要在意别人怎么看自己。江湖…

react 插槽

问题开发当中会经常出现组件十分相似的组件&#xff0c;只有一部分是不同的 解决&#xff1a; 父组件:在引用的时候 import { Component } from "react"; import Me from "../me";const name <div>名称</div> class Shoop extends Compone…

MongoDB聚合操作符:$accumulator

$accumulator可以定义自定义累加器操作符。累加器是一种操作符&#xff0c;可在文档通过管道时保持其状态&#xff08;如&#xff1a;总数、最大值、最小值和相关数据&#xff09;。$accumulator操作符支持执行自定义的JavaScript函数&#xff0c;可以实现MongoDB查询语言不支持…

【STM32 CubeMX】学STM必会的数据结构——环形缓冲区

文章目录 前言一、环形缓冲区是什么二、实现环形缓冲区实现分析2.1 环形缓冲区初始化2.2 写buf2.3 读buf2.4 测试 三、代码总况总结 前言 在嵌入式系统开发中&#xff0c;经常需要处理数据的缓存和传输&#xff0c;而环形缓冲区是一种常见且有效的数据结构&#xff0c;特别适用…

幻兽帕鲁官方更新了,服务器端怎么更新?

幻兽帕鲁官方客户端更新了&#xff0c;那么它的服务器端版本也是需要更新的&#xff0c;不然版本不一致的话&#xff0c;就不能进入游戏了。 具体的更新方法有两种&#xff0c;一是手动输入命令进行更新。第二种是在面板一键更新。 无论你是在阿里云或者腾讯云购买的一键部署…

Pycharm里如何设置多Python文件并行运行

点击上方“Python爬虫与数据挖掘”&#xff0c;进行关注 回复“书籍”即可获赠Python从入门到进阶共10本电子书 今 日 鸡 汤 夕阳何事近黄昏&#xff0c;不道人间犹有未招魂。 大家好&#xff0c;我是皮皮。 一、前言 相信使用Pycharm的粉丝们肯定有和我一样的想法&#xff0c;…

rollup 和 esbuild 的对比

Rollup 和 esbuild 都是 JavaScript 模块打包工具&#xff0c;用于将多个模块打包成一个或多个浏览器可执行的文件。Rollup 先被提出&#xff0c;esbuild 后被提出。 Rollup&#xff1a; 提出时间&#xff1a;Rollup 是在 2015 年首次发布的。它最初的目标是专注于 ES6 模块的静…

算法学习——LeetCode力扣贪心篇1

算法学习——LeetCode力扣贪心篇1 455. 分发饼干 455. 分发饼干 - 力扣&#xff08;LeetCode&#xff09; 描述 假设你是一位很棒的家长&#xff0c;想要给你的孩子们一些小饼干。但是&#xff0c;每个孩子最多只能给一块饼干。 对每个孩子 i&#xff0c;都有一个胃口值 g[…

Vulnhub靶机:DC3

一、介绍 运行环境&#xff1a;Virtualbox 攻击机&#xff1a;kali&#xff08;10.0.2.15&#xff09; 靶机&#xff1a;DC3&#xff08;10.0.2.56&#xff09; 目标&#xff1a;获取靶机root权限和flag 靶机下载地址&#xff1a;https://www.vulnhub.com/entry/dc-32,312…

[Python人工智能] 四十一.命名实体识别 (2)基于BiGRU-CRF的中文实体识别万字详解

从本专栏开始,作者正式研究Python深度学习、神经网络及人工智能相关知识。前文讲解如何实现威胁情报实体识别,利用BiLSTM-CRF算法实现对ATT&CK相关的技战术实体进行提取,是安全知识图谱构建的重要支撑。这篇文章将以中文语料为主,介绍中文命名实体识别研究,并构建BiGR…

总结FreeRTOS中的任务调度算法,空闲任务,任务状态等概念。

任务调度算法 抢占式调度&#xff1a;高优先级的任务优先执行&#xff0c;并且可以打断低优先级的任务执行。 时间片轮转&#xff1a;相同优先级的任务&#xff0c;拥有相同的时间片&#xff0c;当时间片被耗尽&#xff0c;就退出当前任务。 空闲任务 空闲指的就是当系统中…

嵌入式系统的基础知识:了解嵌入式系统的构成和工作原理

&#xff08;本文为简单介绍&#xff0c;个人观点仅供参考&#xff09; 嵌入式系统是建立在微处理器基础上的计算机系统,用于对专门的功能进行控制、运算和接口。它结合了硬件和软件,可以提供实时的响应,广泛应用于工业控制、通信、医疗、交通等领域。 嵌入式系统的核心是微处理…

猫头虎分享已解决Bug || 代码部署失败(Code Deployment Failure):DeploymentError, FailedRelease

博主猫头虎的技术世界 &#x1f31f; 欢迎来到猫头虎的博客 — 探索技术的无限可能&#xff01; 专栏链接&#xff1a; &#x1f517; 精选专栏&#xff1a; 《面试题大全》 — 面试准备的宝典&#xff01;《IDEA开发秘籍》 — 提升你的IDEA技能&#xff01;《100天精通鸿蒙》 …

代码随想录算法训练营29期Day51|LeetCode 139

文档讲解&#xff1a;单词拆分 139.单词拆分 题目链接&#xff1a;https://leetcode.cn/problems/word-break/description/ 思路&#xff1a; 单词就是物品&#xff0c;字符串s就是背包&#xff0c;单词能否组成字符串s&#xff0c;就是问物品能不能把背包装满。 拆分时可以重…

PMDG 737

在Simbrief中生成计划后下载两个文件 放到A:\Xbox\Community\pmdg-aircraft-738\Config\Flightplans中

机器视觉技术:提升安全与效率的关键

机器视觉技术&#xff1a;提升安全与效率的关键 随着技术的不断发展&#xff0c;机器视觉技术已经成为提高许多行业安全与效率的关键要素。无论是在工业制造、交通监控、安全防卫&#xff0c;还是在医疗诊断、零售管理等领域&#xff0c;机器视觉技术都发挥着越来越重要的作用…