celery的中文_celery异步任务框架

590acc0af8e3c7ac3b486eaacd5a9823.gif

fc3debb928eb6be94b5eff525f7ba8e4.png

目录

  • Celery

  • 一、官方

  • 二、Celery异步任务框架Celery架构图消息中间件任务执行单元任务结果存储

  • 三、使用场景

  • 四、Celery的安装配置

  • 五、两种celery任务结构:提倡用包管理,结构更清晰

  • 七、Celery执行异步任务包架构封装

  • 八、基本使用celery.py 基本配置tasks.py 添加任务add_task.py 添加立即、延迟任务get_result.py 获取结果

  • 九、高级使用celery.py 定时任务配置(循环的)tasks.pyget_result.py

  • 十、django中使用(更新轮播图案例)redis的配置接口缓存views.py启动服务celery.pytasks.py

Celery

一、官方

Celery 官网:http://www.celeryproject.org/

Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html

Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/

二、Celery异步任务框架

"""
1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
2)celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求
人是一个独立运行的服务 | 医院也是一个独立运行的服务
正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
"""

Celery架构图

Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。

c1d6432550115750abb5c37c0df24b8e.png

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

三、使用场景

异步执行:解决耗时任务

延迟执行:解决延迟任务

定时执行:解决周期(周期)任务

四、Celery的安装配置

pip install celery

消息中间件:RabbitMQ/Redis

app=Celery('任务名', broker='xxx', backend='xxx')

五、两种celery任务结构:提倡用包管理,结构更清晰

# 如果 Celery对象:Celery(...) 是放在一个模块下的
# 1)终端切换到该模块所在文件夹位置:scripts
# 2)执行启动worker的命令:celery worker -A 模块名 -l info -P eventlet
# 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info
# 注:模块名随意


# 如果 Celery对象:Celery(...) 是放在一个包下的
# 1)必须在这个包下建一个celery.py的文件,将Celery(...)产生对象的语句放在该文件中
# 2)执行启动worker的命令:celery worker -A 包名 -l info -P eventlet
# 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info
# 注:包名随意

放在根目录下就行:

f9efbb625cb575b6ecf0ef2480d17376.png

七、Celery执行异步任务

包架构封装

project
├── celery_task # celery包
│ ├── __init__.py # 包文件
│ ├── celery.py # celery连接和配置相关文件,且名字必须交celery.py
│ └── tasks.py # 所有任务函数
├── add_task.py # 添加任务
└── get_result.py # 获取结果

八、基本使用

celery.py 基本配置

# 1)创建app + 任务

# 2)启动celery(app)服务:
# 非windows
# 命令:celery worker -A celery_task -l info
# windows:
# pip3 install eventlet
# celery worker -A celery_task -l info -P eventlet

# 3)添加任务:手动添加,要自定义添加任务的脚本,右键执行脚本

# 4)获取结果:手动获取,要自定义获取任务的脚本,右键执行脚本


from celery import Celery
# 无密码
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
# 有密码:
broker = 'redis://:123@127.0.0.1:6379/1'
backend = 'redis://:123@127.0.0.1:6379/2'
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
'''
broker : 任务仓库
backend :任务结果仓库
include :任务(函数)所在文件
'''

tasks.py 添加任务

from .celery import app

@app.task
def add(n1,n2):
res = n1+n2
print('n1+n2 = %s' % res)
return res


@app.task
def low(n1,n2):
res = n1-n2
print('n1-n2 = %s' % res)
return res

add_task.py 添加立即、延迟任务

from celery_task import tasks

# delay :添加立即任务
# apply_async :添加延迟任务
# eta :执行的utc时间


# 添加立即执行任务
t1 = tasks.add.delay(10, 20)
t2 = tasks.low.delay(100, 50)
print(t1.id)


# 添加延迟任务
from celery_package.tasks import jump
from datetime import datetime,timedelta

# 秒
def eta_second(second):
ctime = datetime.now() # 当前时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) # 当前UTC时间
time_delay = timedelta(seconds=second) # 秒
return utc_ctime + time_delay # 当前时间+往后延迟的秒
# 天
def eta_days(days):
ctime = datetime.now() # 当前时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) # 当前UTC时间
time_delay = timedelta(days=days) # 天
return utc_ctime + time_delay # 当前时间+往后延迟的天

jump.apply_async(args=(20,5), eta=eta_second(10)) # 10秒后执行
jump.apply_async(args=(20,5), eta=eta_days(1)) # 1天后执行

get_result.py 获取结果

from celery_task.celery import app

from celery.result import AsyncResult

id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
if __name__ == '__main__':
async = AsyncResult(id=id, app=app)
if async.successful():
result = async.get()
print(result)
elif async.failed():
print('任务失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')

九、高级使用

celery.py 定时任务配置(循环的)

特点:

添加任务的终端关闭之后,停止添加

celery服务端关闭后,把关闭之后未执行的任务都执行一遍,然后继续接收任务

# 1)创建app + 任务

# 2)启动celery(app)服务:
# 注):-A 表示相对路径,所以一定先进入celery_task所在包
-l 表示打印到日志 info 级别
# 非windows
# 命令:celery worker -A celery_task -l info
# windows:
# pip3 install eventlet
# celery worker -A celery_task -l info -P eventlet

# 3)添加任务:自动添加任务,所以要启动一个添加任务的服务
# 命令:celery beat -A celery_task -l info

# 4)获取结果


from celery import Celery

# 无密码
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
# 有密码:
broker = 'redis://:123@127.0.0.1:6379/1'
backend = 'redis://:123@127.0.0.1:6379/2'
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])


# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 自动任务的定时配置
from datetime import timedelta
from celery.schedules import crontab

app.conf.beat_schedule = {
# 定时任务名字
'fall_task': {
'task': 'celery_task.tasks.fall',
'args':(30,20),
'schedule': timedelta(seconds=3), # 3秒后执行
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
}
}

'''
fall_task:任务名自定义
task:任务来源
args:任务参数
schedule:定时时间
'''


'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
'''
minute : 分钟
hour :小时
day_of_week :礼拜
day_of_month:月
month_of_year:年
'''

tasks.py

from .celery import app

@app.task
def fall(n1,n2):
res = n1/n2
print('n1 /n2 = %s' % res)
return res

get_result.py

from celery_task.celery import app

from celery.result import AsyncResult

id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
if __name__ == '__main__':
async = AsyncResult(id=id, app=app)
if async.successful():
result = async.get()
print(result)
elif async.failed():
print('任务失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')

十、django中使用(更新轮播图案例)

最终达到的效果:根据定时任务来更新redis中的缓存。用户获取资源都是从redis缓存中获取。避免了数据库的压力

redis的配置

dev.py

# 缓存redis数据库配置
CACHES = {
"default": {
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": "redis://127.0.0.1:6379/10",
"OPTIONS": {
"CLIENT_CLASS": "django_redis.client.DefaultClient",
"CONNECTION_POOL_KWARGS": {"max_connections": 100}, # 同时的并发量
"DECODE_RESPONSES": True,
"PASSWORD": "123",
}
}
}

接口缓存

"""
1)什么是接口的后台缓存
前台访问后台接口,后台会优先从缓存(内存)中查找接口数据
如果有数据,直接对前台响应缓存数据
如果没有数据,与(mysql)数据库交互,得到数据,对前台响应,同时将数据进行缓存,以备下次使用
了解:前台缓存 - 前台在请求到接口数据后,在前台建立缓存,再发送同样请求时,发现前台缓存有数据,就不再对后台做请求了
2)什么的接口会进行接口缓存
i)接口会被大量访问:比如主页中的接口,几乎所有人都会访问,而且会重复访问
ii)在一定时间内数据不会变化(或数据不变化)的接口
iii)接口数据的时效性不是特别强(数据库数据发生变化了,不是立即同步给前台,验后时间同步给前台也没事)
注:理论上所有接口都可以建立缓存,只要数据库与缓存数据同步及时
3)如何实现接口缓存:主页轮播图接口
"""

views.py

from rest_framework.viewsets import ModelViewSet
from rest_framework import mixins
from . import models, serializers
from django.conf import settings
from rest_framework.response import Response

from django.core.cache import cache
class BannerViewSet(ModelViewSet, mixins.ListModelMixin):
queryset = models.Banner.objects.filter(is_delete=False, is_show=True).order_by('-orders')[:settings.BANNER_COUNT]
serializer_class = serializers.BannerSerializer

# 有缓存走缓存,没有缓存走数据库,然后同步给缓存。接口自己实现
def list(self, request, *args, **kwargs):
banner_list = cache.get('banner_list')

if not banner_list:
print('走了数据库')
response = self.list(request, *args, **kwargs)
banner_list = response.data
cache.set('banner_list', banner_list, 86400) # 存进缓存中,缓存配置了redis数据库

return Response(banner_list)

1aa6864bfc728d33e99126fa5c6932c6.png

启动服务

'''
1):先切换到celery_task所在的同级目录(一般为根目录下)
2):开一个终端(启动服务): celery worker -A celery_task -l info -P eventlet
3):再开一个终端(添加任务): celery beat -A celery_task -l info
'''
# 注):-A 表示相对路径,所以一定先进入celery_task所在包
-l 表示打印到日志 info 级别

celery.py

"""
celery框架django项目工作流程
1)加载django配置环境
2)创建Celery框架对象app,配置broker和backend,得到的app就是worker
3)给worker对应的app添加可处理的任务函数,用include配置给worker的app
4)完成提供的任务的定时配置app.conf.beat_schedule
5)启动celery服务,运行worker,执行任务
6)启动beat服务,运行beat,添加任务
重点:由于采用了django的反射机制,使用celery.py所在的celery_task包必须放置项目的根目录下
"""

# 一、加载django配置环境
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")

# 二、加载celery配置环境
from celery import Celery
broker = 'redis://:123@127.0.0.1:6379/1'
backend = 'redis://:123@127.0.0.1:6379/2'
# worker
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks']) # 外面的包名和文件名,一般都是固定


# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
# 定时任务名字
'update_banner_cache': {
'task': 'celery_task.tasks.update_banner_list',
'args': (),
'schedule': timedelta(seconds=10), # 3秒一次
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
# 'schedule': crontab(minute=0, day_of_week=1), # 每周一早八点
}
}
'''
minute : 分钟
hour :小时
day_of_week :礼拜
day_of_month:月
month_of_year:年
'''

'''
fall_task:任务名自定义
task:任务来源
args:任务参数
schedule:定时时间(秒)
'''

tasks.py

from .celery import app

from django.core.cache import cache
from home import models, serializers
from django.conf import settings
@app.task
def update_banner_list():
queryset = models.Banner.objects.filter(is_delete=False, is_show=True).order_by('-orders')[:settings.BANNER_COUNT]
banner_list = serializers.BannerSerializer(queryset, many=True).data
# 拿不到request对象,所以头像的连接base_url要自己组装
for banner in banner_list:
banner['image'] = 'http://127.0.0.1:8000%s' % banner['image']

cache.set('banner_list', banner_list, 86400)
return True

选择了IT,必定终身学习

作者:Jeff

出处:http://dwz.date/aNfM

图片和内容源自网络分享,若有侵权,请联系删除!

上海艾磊科技有限公司专门为企业提供IT咨询,IT外包,系统集成,以及各类IT增值服务。其中增值服务包括OFFICE 365云服务,鼎捷企业ERP管理软件,云备份,企业邮箱,无线覆盖,上网行为管理,VPN架设,网络安全服务,INTERNET接入,设备租赁, IP电话服务

89383826dec0bec3238df359baed2c21.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/542155.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

关于linux mv指令机制

最近在mv文件的时候,操作失误将生产服务器一个1TB的文件夹mv到了/opt/test目录,因为最后/opt/目录被沾满所以1TB的文件夹没有迁移过来,写入了30GB数据到了/opt/test目录,因为系统分区被沾满,所以把test目录给删除了。 …

数据库的管理

1. 数据库的简介 定义:数据库(Database)就是一种按数据结构来组织,存储和管理数据的仓库,其中包含数据挖掘,大数据信息的推送。 mariadb数据库管理系统是mysql的一个分支,主要由开源社区在维护&…

C#中的Dictionary字典类介绍(转载)

C#中的Dictionary字典类介绍 关键字:C# Dictionary 字典 作者:txw1958原文:http://www.cnblogs.com/txw1958/archive/2012/11/07/csharp-dictionary.html 说明 必须包含名空间System.Collection.Generic Dictionary里面的每一个元素都…

高速缓存dns

1. DNS: Domain Name System,域名系统。 万维网上作为域名和IP地址相互映射的一个分布式数据库,能够使用户更方便的访问互联网。他主要负责把域名和IP的相互转换,DNS运行与TCP|UDP的53端口上。 2. 高速缓存DNS:DNS服务…

Apache服务配置

1. apache 企业中常用的web服务。用来提供http://(超文本传输协议) 基础信息: 主配置目录: /etc/httpd/conf 主配置文件: /etc/httpd/conf/httpd.conf 子配置目录: /etc/httpd/conf.d/ 子配置文…

如何安装Genymotion虚拟机以及Genmotion的eclipse插件

---内容开始--- - 首先去genymotion的官网去下载其安装文件 资源下载 Genymotion官网必须注册一个账号这个账号安装之后还有用的,用户名最好用网易126邮箱注册----我下载的是2.8.0的版本(注:注册前先开个代理服务器不然页面打不开下载时最好用迅雷下载这…

squid服务配置(正向、反向代理)

代理: 就是代理网络用户去取得网络信息。 Squid是一种用来缓冲Internet数据的软件。安装Squid服务实现代理缓存服务器功能。 正向代理:意思是一个位于客户端和原始服务器之间的服务器,为了从原始服务器取得内容,客户端向代理发送一…

c语言getchar函数_C语言中带有示例的getchar()函数

c语言getchar函数C语言中的getchar()函数 (getchar() function in C) The getchar() function is defined in the <stdio.h> header file. getchar()函数在<stdio.h>头文件中定义。 Prototype: 原型&#xff1a; int getchar(void);Parameters: FILE *filename(f…

python及pycharm

1.python简介&#xff1a; Python是一种计算机程序设计语言。是一种动态的、面向对象的脚本语言&#xff0c;最初被设计用于编写自动化脚本(shell)&#xff0c;随着版本的不断更新和语言新功能的添加&#xff0c;越来越多被用于独立的、大型项目的开发。 python最重要的功能&am…

移动端适配方案(上)

转载自:https://github.com/riskers/blog/issues/17 要搞懂移动端的适配问题&#xff0c;就要先搞明白像素和视口。 像素 在移动端给一个元素设置 width:200px 时发生了什么&#xff1f;这里的px到底是多长呢&#xff1f;像素是网页布局的基础&#xff0c;但是我们一直在用直觉…

oracle sql 语句如何插入全年日期?

为什么80%的码农都做不了架构师&#xff1f;>>> oracle sql 语句如何插入全年日期&#xff1f; create table BSYEAR (d date); insert into BSYEAR select to_date(20030101,yyyymmdd)rownum-1 from all_objects where rownum < to_char(to_date(20031231,…

java基础——java基本运算

java基本运算 转载于:https://www.cnblogs.com/zhouj/p/6132535.html

【Java】MybatisPlus

MybatisPlus MybatisPlus是在mybatis基础上的一个增强型工具。它对mybatis的一些操作进行了简化&#xff0c;能够提高开发的效率。 springboot整合了mybatis之后&#xff0c;其实已经非常方便了&#xff0c;只需要导入mybatis的包后&#xff0c;在配置文件中编写数据源信息&a…

更新SQL Server实例所有数据库表统计信息

引出问题 自从上次菜鸟为老鸟解决了《RDS SQL SERVER 解决中文乱码问题》问题&#xff0c;老鸟意犹未尽&#xff0c;决定再想个招来刁难刁难菜鸟&#xff1a;“我最近做T-SQL性能调优的时候&#xff0c;经常发现执行计划中的统计信息不准确&#xff0c;导致SQL Server查询性能低…

从0开始搭建SQL Server AlwaysOn 第四篇(配置异地机房节点)

从0开始搭建SQL Server AlwaysOn 第四篇&#xff08;配置异地机房节点&#xff09; 第一篇http://www.cnblogs.com/lyhabc/p/4678330.html第二篇http://www.cnblogs.com/lyhabc/p/4682028.html第三篇http://www.cnblogs.com/lyhabc/p/4682986.html第四篇http://www.cnblogs.com…

解决方案_智能工厂全套解决方案

最近弱电社群资料更新情况&#xff1a;1、弱电学习圈VIP群资料整理-希望对您有用&#xff01;2、弱电学习圈VIP技术交流2群成立&#xff0c;欢迎您加入&#xff01;3、智慧校园整体解决方案&#xff01;4、智能化弱电项目管理表单大全-弱电项目经理必备&#xff01;5、弱电项目…

windows 2008 R2系统安装拨号v p n详细配置

windows 2008 R2系统单网卡安装拨号v p n系统环境&#xff1a;windows 2008 R2操作步骤首先设置服务端在服务器管理器中添加角色“网络策略和访问服务”&#xff0c;并安装以下角色服务右击路由与远程访问&#xff0c;选择“配置并启用路由和远程访问”右击路由与远程访问&…

word打开老是配置进度_小白教程 | office出现配置进度框,怎么办?

最近很多同学在备考二级时候&#xff0c;自己的电脑上office软件Word或者Excel出问题了&#xff0c;每次打开都会出现配置进度框。这种情况怎么办呢&#xff1f;这种情况都是注册表的问题&#xff0c;马上安排解决之前在 右键菜单没有office新建怎么办(点击即可阅读)也是用注册…

【踩坑速记】开源日历控件,顺便全面解析开源库打包发布到Bintray/Jcenter全过程(新),让开源更简单~...

一、写在前面 自使用android studio开始&#xff0c;就被它独特的依赖方式&#xff1a;compile com.android.support:appcompat-v7:25.0.1所深深吸引&#xff0c;自从有了它&#xff0c;麻麻再也不用担心依赖第三方jar包繁琐无趣啦。而&#xff0c;如果自己写一个开源库是一种怎…

功能区不显示工具条_【新老客户必知】软件支持超高清屏显示器了

随着计算机硬件的不断更新换代显示设备的不断更新从原来的分辨率640 X 480啥原来分辨这么低呀&#xff1f;还记得DOS吗&#xff1f;或者Win95,win98吗当时显示器分辨率能调到800X 600很好了2000年左右随着纯平显示器的推出也有了高清显示器的概念那么一般我们说的高清显示器分辨…