利用 Celery 构建 Web 服务的后台任务调度模块

来源:http://www.tuicool.com/articles/Enaeymm


任务队列在 Web 服务里的应用

在 Web2.0 后的时代,社交网站、搜索引擎的的迅猛发展对 Web 服务的后台管理系统提出了更高的需求。考虑几个常见的使用场景:

  1. 社交网站的用户在其主页发布了一组新的照片,这条新鲜事需要适时地推送至该用户的所有好友。该网站的活跃用户有千万级别,在同一时刻会有非常多的“新鲜事推送”任务需要处理,并且每个用户的好友数会达到 1000+的级别。出于用户体验的考虑,用户发布照片的这个操作需要在较短时间内得到反馈。
  2. 在文献搜索系统的主页,用户可以查到当前一小时内最热门的十大文献,并且能够直接访问该文献。该文献管理系统所管理的文献数量非常多,达到 PB 的级别。处于用户体验的考虑,用户获得十大热门文献这个动作需要在较短时间内获得反馈。

考虑对于高并发大用户量的 Web 服务系统,对于场景一和场景二中的需求,如果在请求处理周期内完成这些任务,然后再返回结果,这种传统的做法会导致用户等待的时间过长。同时 Web 服务管理后台对任务处理能力也缺乏扩展性。

在这种场景下,任务队列是有效的解决方案。在一个任务队列系统中,“将新鲜事推送至用户 A 的所有好友”或者“查询当前最热门的十大文献”这种查询或者计算工作可以被当成一个“任务”。在任务队列系统中,一般有任务生产者、任务处理中间方以及任务消费者三方。其中任务生产者负责生产任务,比如“将新鲜事推送至用户 A 的所有好友”这一任务的发起方就可以称作任务生产者。任务处理中间方负责接收任务生产者的任务处理请求,对任务进行调度,最后将任务分发给任务消费者来进行处理。任务消费者就是执行任务的一方,它负责接收任务处理中间方发来的任务处理请求,完成这些任务,并且返回任务处理的结果。在生产方、消费者和任务处理中间方之间一般使用消息传递的方式来进行通信。

在任务队列系统框架中,任务消费者可以跨越不同的服务节点,可以动态地增加节点来增加系统的任务处理能力,非常适合高并发、需要横向扩展的 Web 服务后台。

回页首

Celery: 基于 Python 的开源分布式任务调度模块

Celery 是一个用 Python 编写的分布式的任务调度模块,它有着简明的 API,并且有丰富的扩展性,适合用于构建分布式的 Web 服务。

图 1. Celery 的模块架构


Celery 的模块架构较为简洁,但是提供了较为完整的功能:

任务生产者 (task producer)

任务生产者 (task producer) 负责产生计算任务,交给任务队列去处理。在 Celery 里,一段独立的 Python 代码、一段嵌入在 Django Web 服务里的一段请求处理逻辑,只要是调用了 Celery 提供的 API,产生任务并交给任务队列处理的,我们都可以称之为任务生产者。

任务调度器 (celery beat)

Celery beat 是一个任务调度器,它以独立进程的形式存在。Celery beat 进程会读取配置文件的内容,周期性地将执行任务的请求发送给任务队列。Celery beat 是 Celery 系统自带的任务生产者。系统管理员可以选择关闭或者开启 Celery beat。同时在一个 Celery 系统中,只能存在一个 Celery beat 调度器。

任务代理 (broker)

任务代理方负责接受任务生产者发送过来的任务处理消息,存进队列之后再进行调度,分发给任务消费方 (celery worker)。因为任务处理是基于 message(消息) 的,所以我们一般选择 RabbitMQ、Redis 等消息队列或者数据库作为 Celery 的 message broker。

任务消费方 (celery worker)

Celery worker 就是执行任务的一方,它负责接收任务处理中间方发来的任务处理请求,完成这些任务,并且返回任务处理的结果。Celery worker 对应的就是操作系统中的一个进程。Celery 支持分布式部署和横向扩展,我们可以在多个节点增加 Celery worker 的数量来增加系统的高可用性。在分布式系统中,我们也可以在不同节点上分配执行不同任务的 Celery worker 来达到模块化的目的。

结果保存

Celery 支持任务处理完后将状态信息和结果的保存,以供查询。Celery 内置支持 rpc, Django ORM,Redis,RabbitMQ 等方式来保存任务处理后的状态信息。

回页首

构建第一个 Celery 程序

在我们的第一个 Celery 程序中,我们尝试在 Celery 中构建一个“将新鲜事通知到朋友”的任务,并且尝试通过编写一个 Python 程序来启动这个任务。

安装 Celery

Pip install celery

选择合适的消息代理中间件

Celery 支持 RabbitMQ、Redis 甚至其他数据库系统作为其消息代理中间件,在本文中,我们选择 RabbitMQ 作为消息代理中间件。

sudo apt-get install rabbitmq-server

创建 Celery 对象

Celery 对象是所有 Celery 功能的入口,所以在开始其它工作之前,我们必须先定义我们自己的 Celery 对象。该对象定义了任务的具体内容、任务队列的服务地址、以及保存任务执行结果的地址等重要信息。

# notify_friends.py
from celery import Celery
import time
app = Celery('notify_friends', backend='rpc://', broker='amqp://localhost')@app.task
def notify_friends(userId, newsId):print 'Start to notify_friends task at {0}, userID:{1} newsID:{2}'.format(time.ctime(), userId, newsId)time.sleep(2)print 'Task notify_friends succeed at {0}'.format(time.ctime())return True

在本文中,为了模拟真实的应用场景,我们定义了 notify_friends 这个任务,它接受两个参数,并且在输出流中打印出一定的信息,

创建 Celery Worker 服务进程

在定义完 Celery 对象后,我们可以创建对应的任务消费者--Celery worker 进程,后续的任务处理请求都是由这个 Celery worker 进程来最终执行的。

celery -A celery_test worker --loglevel=info

在 Python 程序中调用 Celery Task

我们创建一个简单的 Python 程序,来触发 notify_friends 这个任务。

# call_notify_friends.pyfrom notify_friends import notify_friends
import timedef notify(userId, messageId):result = notify_friends.delay(userId, messageId)while not result.ready():time.sleep(1)print result.get(timeout=1)if __name__ == '__main__':notify('001', '001')

我们在 call_notify_friends.py 这个程序文件中,定义了 Notify 函数,它调用了我们之前定义的 notify_friends 这个 API,来发送任务处理请求到任务队列,并且不断地查询等待来获得任务处理的结果。

Celery worker 中的 log 信息:

[tasks]. celery_test.notify_friends[2015-11-16 15:02:31,113: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2015-11-16 15:02:31,122: INFO/MainProcess] mingle: searching for neighbors
[2015-11-16 15:02:32,142: INFO/MainProcess] mingle: all alone
[2015-11-16 15:02:32,179: WARNING/MainProcess] celery@yuwenhao-VirtualBox ready.
[2015-11-16 15:04:45,474: INFO/MainProcess] Received task: 
celery_test.notify_friends[3f090a76-7678-4f9c-a37b-ceda59600f9c]
[2015-11-16 15:04:45,475: WARNING/Worker-2] Start to notify_friends task at 
Mon Nov 16 15:04:45 2015, userID:001 newsID:001
[2015-11-16 15:04:47,477: WARNING/Worker-2] Task notify_friends succeed at Mon Nov 16 15:04:47 2015
[2015-11-16 15:04:47,511: INFO/MainProcess] Task celery_test.notify_friends[3f090a76-7678-4f9c-a37b-ceda59600f9c] succeeded in 2.035536565s: True

我们可以看到,Celery worker 收到了 Python 程序的 notify_friends 任务的处理请求,并且执行完毕。

回页首

利用调度器创建周期任务

在我们第二个 Celery 程序中,我们尝试构建一个周期性执行“查询当前一小时最热门文献”的任务,每隔 100 秒执行一次,并将结果保存起来。后续的搜索请求到来后可以直接返回已有的结果,极大优化了用户体验。

创建配置文件

Celery 的调度器的配置是在 CELERYBEAT_SCHEDULE 这个全局变量上配置的,我们可以将配置写在一个独立的 Python 模块,在定义 Celery 对象的时候加载这个模块。我们将 select_populate_book 这个任务定义为每 100 秒执行一次。

# config.py
from datetime import timedeltaCELERYBEAT_SCHEDULE = {'select_populate_book': {'task': 'favorite_book.select_populate_book','schedule': timedelta(seconds=100),},
}

创建 Celery 对象

在 Celery 对象的定义里,我们加载了之前定义的配置文件,并定义了 select_populate_book 这个任务。

#favorite_book.py
from celery import Celery
import timeapp = Celery('select_populate_book', backend='rpc://', broker='amqp://localhost')
app.config_from_object('config')@app.task
def select_populate_book():print 'Start to select_populate_book task at {0}'.format(time.ctime())time.sleep(2)print 'Task select_populate_book succeed at {0}'.format(time.ctime())return True

启动 Celery worker

celery -A favorite_book worker --loglevel=info

启动 Celery beat

启动 Celery beat 调度器,Celery beat 会周期性地执行在 CELERYBEAT_SCHEDULE 中定义的任务,即周期性地查询当前一小时最热门的书籍。

celery -A favorite_book beatyuwenhao@yuwenhao:~$ celery -A favorite_book beat
celery beat v3.1.15 (Cipater) is starting.
__ - ... __ - _
Configuration ->. broker -> amqp://guest:**@localhost:5672//. loader -> celery.loaders.app.AppLoader. scheduler -> celery.beat.PersistentScheduler. db -> celerybeat-schedule. logfile -> [stderr]@%INFO. maxinterval -> now (0s)
[2015-11-16 16:21:15,443: INFO/MainProcess] beat: Starting...
[2015-11-16 16:21:15,447: WARNING/MainProcess] Reset: 
Timezone changed from 'UTC' to None
[2015-11-16 16:21:25,448: INFO/MainProcess] Scheduler: 
Sending due task select_populate_book (favorite_book.select_populate_book)
[2015-11-16 16:21:35,485: INFO/MainProcess] Scheduler:
Sending due task select_populate_book (favorite_book.select_populate_book)
[2015-11-16 16:21:45,490: INFO/MainProcess] Scheduler:
Sending due task select_populate_book (favorite_book.select_populate_book)

我们可以看到,Celery beat 进程周期性地将任务执行请求 select_populate_book 发送至任务队列。

yuwenhao@yuwenhao:~$ celery -A favorite_book worker --loglevel=info
[2015-11-16 16:21:11,560: WARNING/MainProcess] 
/usr/local/lib/python2.7/dist-packages/celery/apps/worker.py:161: CDeprecationWarning: 
Starting from version 3.2 Celery will refuse to accept pickle by default.The pickle serializer is a security concern as it may give attackers
the ability to execute any command. It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']You must only enable the serializers that you will actually use.warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))-------------- celery@yuwenhao-VirtualBox v3.1.15 (Cipater)
---- **** ----- 
--- * *** * -- Linux-3.5.0-23-generic-x86_64-with-Ubuntu-12.04-precise
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app: select_populate_book:0x1b219d0
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results: rpc://
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- 
--- ***** ----- [queues]-------------- .> celery exchange=celery(direct) key=celery[tasks]. favorite_book.select_populate_book[2015-11-16 16:21:11,579: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2015-11-16 16:21:11,590: INFO/MainProcess] mingle: searching for neighbors
[2015-11-16 16:21:12,607: INFO/MainProcess] mingle: all alone
[2015-11-16 16:21:12,631: WARNING/MainProcess] celery@yuwenhao-VirtualBox ready.
[2015-11-16 16:21:25,459: INFO/MainProcess] Received task: 
favorite_book.select_populate_book[515f7c55-7ff0-4fcf-bc40-8838f69805fd]
[2015-11-16 16:21:25,460: WARNING/Worker-2] 
Start to select_populate_book task at Mon Nov 16 16:21:25 2015
[2015-11-16 16:21:27,462: WARNING/Worker-2] 
Task select_populate_book succeed at Mon Nov 16 16:21:27 2015
[2015-11-16 16:21:27,475: INFO/MainProcess] Task favorite_book.select_populate_book
[515f7c55-7ff0-4fcf-bc40-8838f69805fd] succeeded in 2.015802141s: True
[2015-11-16 16:21:35,494: INFO/MainProcess] Received task: 
favorite_book.select_populate_book[277d718a-3435-4bca-a881-a8f958d64aa9]
[2015-11-16 16:21:35,498: WARNING/Worker-1]
Start to select_populate_book task at Mon Nov 16 16:21:35 2015
[2015-11-16 16:21:37,501: WARNING/Worker-1] 
Task select_populate_book succeed at Mon Nov 16 16:21:37 2015
[2015-11-16 16:21:37,511: INFO/MainProcess] 
Task favorite_book.select_populate_book
[277d718a-3435-4bca-a881-a8f958d64aa9] succeeded in 2.014368786s: True

我们可以看到,任务 select_populate_book 的 Celery worker 周期性地收到 Celery 调度器的任务的处理请求,并且运行该任务。

回页首

结束语

任务队列技术可以满足 Web 服务系统后台任务管理和调度的需求,适合构建分布式的 Web 服务系统后台。Celery 是一个基于 Python 的开源任务队列系统。它有着简明的 API 以及良好的扩展性。本文首先介绍了队列技术的基本原理,然后介绍了 Celery 的模块架构以及工作原理。最后,本文通过实例介绍了如何在 Python 程序中调用 Celery API 并通过 Celery 任务队列来执行任务,以及如何通过 Celery beat 在 Celery 任务队列中创建周期性执行的任务。希望本文可以对 Web 后台开发者、以及 Celery 的初学者有所帮助。



本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/496479.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java元素符号是什么_Java 代码中 @ 符号是什么意思?

展开全部annotation。Annotation,是Java5的新特性,下面是Sun的Tutorial的描述,因为是英文,这里我翻译下,希望能够比较清晰32313133353236313431303231363533e58685e5aeb931333264633435的描述一下Annotation的语法以及…

风景这边独好的AI大年,百度亚马逊等巨头第一季度成绩单怎么样?

苹果春季新品发布会邀请函来源:遇见人工智能该来的还是来了,但是那又如何?这句话是送给苹果的。本以为这个季度不会再有新品发布的苹果,却用一张主打“同学们,来次课外活动吧”的神奇邀请函打消了所有人的怀疑。根据从…

最近火了的自动驾驶全球产业链全景图

来源:传感器技术摘要:目前自动驾驶已经发展的很快,除了特斯拉和奥迪以外,很多豪车都引进了自动驾驶技术。下面来盘点一下自动驾驶产业链:首先看一下自动驾驶的等级标准:1、英特尔:Mobileye Alt…

java get set 注解_java技能提升,用Lombok甩掉get和set,让代码变得更简洁

前言前几天有个新来的同事(实习生)惊讶的对我说:我们的代码里好多错误,我的程序本地都启动不了。我一脸懵逼的质问他:目前线上的代码,怎么会有问题吗?他不服气的说:你来看嘛,就是有问题&#xf…

.pfx 证书和 .cer 证书

证书系列: 1:.pfx 证书和 .cer 证书 2:导入pfx证书 通常情况下,作为文件形式存在的证书一般有三种格式: 第一种:带有私钥的证书,由Public Key Cryptography Standards #12,PKCS#1…

HTML5、CSS、CSS3、SCSS (SASS) 相关教程

1、HTML5 教程 W3School HTML5 教程:http://www.w3school.com.cn/html5/index.asp 菜鸟网站 HTML5教程:http://www.runoob.com/html/html5-intro.html 知乎 零基础如何迅速学习HTML:https://www.zhihu.com/question/27018083 请问如何从头…

生命起源之谜:RNA世界假说将迎来终结?

○ 流行的理论认为,生命起源于物质丰富的化学汤,而 RNA 是化学汤中最初的自我复制单元。但是,多肽和RNA混合起来或许会更高效。 | 图片来源:Novikov Aleksey来源:科学出版社 撰文:Jordana Cepelewicz 翻…

PHP 学习路线

PHP 官网文档(中文):https://www.php.net/manual/zh/langref.php ThinkPhp (官方手册、入门教程):https://sites.thinkphp.cn/1556331 ​W3School PHP 教程:http://www.w3school.com.cn/php/index.asp w3cschool (在线教程&技术文档)&am…

jQuery 对话框 jQuery.plugin

jQuery 对话框 jQuery.plugin 强烈推荐对话框插件jquery.weebox.js,本站开源账务管理系统中使用的对话框组件,各种形式的对话框:确认、成功、警告、错误等 ………… 如下图的右下角: 账务管理系统(个人版)演示 图的右下角的框架就…

2018年聊天机器人状态报告

来源: 199IT互联网数据中心根据Drift、SurveyMonkey Audience、Salesforce和myclever的“2018年聊天机器人状态报告”,聊天机器人预计能够24小时为简单任务提供即时服务,但不是进行复杂查询的最佳渠道。聊天机器人尚未在消费者中找到广泛的吸…

PHP、MySQL 注入

Welcome to the NetSPI SQL Injection Wiki:https://sqlwiki.netspi.com/ 因为需要了解下 SQL 注入,就使用 PHP 自己写了一个只有一个网页的网站测试下,现在记录下过程。。。 直接使用的 KALI系统 (KALI官网:Kali Linux | Penetr…

争自动驾驶领头羊还是确保技术安全?欧美选择不同

来源:发掘新视界摘要:对于那些未知或有潜在危险的技术,欧洲更倾向于保护民众,而非是引领创新与进步。自优步自动驾驶汽车致人死亡事件发生之后,欧洲与美国对于技术的态度差异再度凸显,欧洲更倾向于加强监管…

菜鸟教程 之 JavaScript 教程

From:菜鸟教程 JavaScript:https://www.runoob.com/ W3School JavaScript 教程:http://www.w3school.com.cn/js/index.asp https://www.w3cschool.cn/javascript/ 廖雪峰官网 JavaScript 教程:https://www.liaoxuefeng.com/w…

关于Actionscript 3中给Flash传参数方法(一)

关于Actionscript 3中给Flash传参数方法(一) 今天在测试一个为Audi做Flash AD的时候,发现Audi提供的clicktag是Actionscript 2的代码,在Actionscript 3中是不被支持的,所以有去翻了Adobe Docs了。 在2.0中的clicktag代…

深度睡眠中,记忆如何被保持?

来源 :脑与智能丨公众号摘要:深度睡眠对记忆的巩固至关重要,与健康的睡眠相比,失眠会导致记忆力下降。深度睡眠中的慢波分为下降状态(down state)和上升状态(up state),在…

SQL 语句执行顺序

From:http://www.jellythink.com/archives/924 Oracle-SQL语句执行原理和完整过程详解:https://wenku.baidu.com/view/398bc427964bcf84b8d57b00.html 详解一条 SQL 语句的执行过程:http://www.cnblogs.com/cdf-opensource-007/p/6502556.…

无人车巨头每天都在做相同的事情:不惜血本做高精地图是为何?

来源:腾讯科技摘要:很多人都关注无人车的发展,殊不知无人车技术中,有十分重要的一环,是无人车的高精地图。高精地图成为无人车竞赛中的关键一环目前,很多家无人驾驶公司都在做着同样的事情:绘制…

phaser java_死磕 java同步系列之Phaser源码解析

问题(1)Phaser是什么?(2)Phaser具有哪些特性?(3)Phaser相对于CyclicBarrier和CountDownLatch的优势?简介Phaser,翻译为阶段,它适用于这样一种场景,一个大任务可以分为多个阶段完成,且每个阶段的…

人工智能将为传统制造业带来什么?

来源 :数据科学与人工智能“物理世界”(以制造业设备所代表)和“数字世界”(由人工智能、传感器等技术代表)的碰撞催生了制造业的巨大的转变。两个世界的融合将为下一轮经济发展注入新的动能。以人工智能为代表的新技术…

网络模拟器 eNSP、EVE-NG、GNS3、Packet Tracert

工欲善其事必先利其器,学习网络不可能都有真实的网络环境可以使用,这时就需要使用网络模拟软件,模拟一些网络环境,构建一些网络拓扑,然后学习研究网络技术 初学时不要在模拟器的选择上纠结,PT、GNS3 就足够…