celery的中文_celery异步任务框架

590acc0af8e3c7ac3b486eaacd5a9823.gif

fc3debb928eb6be94b5eff525f7ba8e4.png

目录

  • Celery

  • 一、官方

  • 二、Celery异步任务框架Celery架构图消息中间件任务执行单元任务结果存储

  • 三、使用场景

  • 四、Celery的安装配置

  • 五、两种celery任务结构:提倡用包管理,结构更清晰

  • 七、Celery执行异步任务包架构封装

  • 八、基本使用celery.py 基本配置tasks.py 添加任务add_task.py 添加立即、延迟任务get_result.py 获取结果

  • 九、高级使用celery.py 定时任务配置(循环的)tasks.pyget_result.py

  • 十、django中使用(更新轮播图案例)redis的配置接口缓存views.py启动服务celery.pytasks.py

Celery

一、官方

Celery 官网:http://www.celeryproject.org/

Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html

Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/

二、Celery异步任务框架

"""
1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
2)celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求
人是一个独立运行的服务 | 医院也是一个独立运行的服务
正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
"""

Celery架构图

Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。

c1d6432550115750abb5c37c0df24b8e.png

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

三、使用场景

异步执行:解决耗时任务

延迟执行:解决延迟任务

定时执行:解决周期(周期)任务

四、Celery的安装配置

pip install celery

消息中间件:RabbitMQ/Redis

app=Celery('任务名', broker='xxx', backend='xxx')

五、两种celery任务结构:提倡用包管理,结构更清晰

# 如果 Celery对象:Celery(...) 是放在一个模块下的
# 1)终端切换到该模块所在文件夹位置:scripts
# 2)执行启动worker的命令:celery worker -A 模块名 -l info -P eventlet
# 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info
# 注:模块名随意


# 如果 Celery对象:Celery(...) 是放在一个包下的
# 1)必须在这个包下建一个celery.py的文件,将Celery(...)产生对象的语句放在该文件中
# 2)执行启动worker的命令:celery worker -A 包名 -l info -P eventlet
# 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info
# 注:包名随意

放在根目录下就行:

f9efbb625cb575b6ecf0ef2480d17376.png

七、Celery执行异步任务

包架构封装

project
├── celery_task # celery包
│ ├── __init__.py # 包文件
│ ├── celery.py # celery连接和配置相关文件,且名字必须交celery.py
│ └── tasks.py # 所有任务函数
├── add_task.py # 添加任务
└── get_result.py # 获取结果

八、基本使用

celery.py 基本配置

# 1)创建app + 任务

# 2)启动celery(app)服务:
# 非windows
# 命令:celery worker -A celery_task -l info
# windows:
# pip3 install eventlet
# celery worker -A celery_task -l info -P eventlet

# 3)添加任务:手动添加,要自定义添加任务的脚本,右键执行脚本

# 4)获取结果:手动获取,要自定义获取任务的脚本,右键执行脚本


from celery import Celery
# 无密码
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
# 有密码:
broker = 'redis://:123@127.0.0.1:6379/1'
backend = 'redis://:123@127.0.0.1:6379/2'
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
'''
broker : 任务仓库
backend :任务结果仓库
include :任务(函数)所在文件
'''

tasks.py 添加任务

from .celery import app

@app.task
def add(n1,n2):
res = n1+n2
print('n1+n2 = %s' % res)
return res


@app.task
def low(n1,n2):
res = n1-n2
print('n1-n2 = %s' % res)
return res

add_task.py 添加立即、延迟任务

from celery_task import tasks

# delay :添加立即任务
# apply_async :添加延迟任务
# eta :执行的utc时间


# 添加立即执行任务
t1 = tasks.add.delay(10, 20)
t2 = tasks.low.delay(100, 50)
print(t1.id)


# 添加延迟任务
from celery_package.tasks import jump
from datetime import datetime,timedelta

# 秒
def eta_second(second):
ctime = datetime.now() # 当前时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) # 当前UTC时间
time_delay = timedelta(seconds=second) # 秒
return utc_ctime + time_delay # 当前时间+往后延迟的秒
# 天
def eta_days(days):
ctime = datetime.now() # 当前时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) # 当前UTC时间
time_delay = timedelta(days=days) # 天
return utc_ctime + time_delay # 当前时间+往后延迟的天

jump.apply_async(args=(20,5), eta=eta_second(10)) # 10秒后执行
jump.apply_async(args=(20,5), eta=eta_days(1)) # 1天后执行

get_result.py 获取结果

from celery_task.celery import app

from celery.result import AsyncResult

id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
if __name__ == '__main__':
async = AsyncResult(id=id, app=app)
if async.successful():
result = async.get()
print(result)
elif async.failed():
print('任务失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')

九、高级使用

celery.py 定时任务配置(循环的)

特点:

添加任务的终端关闭之后,停止添加

celery服务端关闭后,把关闭之后未执行的任务都执行一遍,然后继续接收任务

# 1)创建app + 任务

# 2)启动celery(app)服务:
# 注):-A 表示相对路径,所以一定先进入celery_task所在包
-l 表示打印到日志 info 级别
# 非windows
# 命令:celery worker -A celery_task -l info
# windows:
# pip3 install eventlet
# celery worker -A celery_task -l info -P eventlet

# 3)添加任务:自动添加任务,所以要启动一个添加任务的服务
# 命令:celery beat -A celery_task -l info

# 4)获取结果


from celery import Celery

# 无密码
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
# 有密码:
broker = 'redis://:123@127.0.0.1:6379/1'
backend = 'redis://:123@127.0.0.1:6379/2'
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])


# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 自动任务的定时配置
from datetime import timedelta
from celery.schedules import crontab

app.conf.beat_schedule = {
# 定时任务名字
'fall_task': {
'task': 'celery_task.tasks.fall',
'args':(30,20),
'schedule': timedelta(seconds=3), # 3秒后执行
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
}
}

'''
fall_task:任务名自定义
task:任务来源
args:任务参数
schedule:定时时间
'''


'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
'''
minute : 分钟
hour :小时
day_of_week :礼拜
day_of_month:月
month_of_year:年
'''

tasks.py

from .celery import app

@app.task
def fall(n1,n2):
res = n1/n2
print('n1 /n2 = %s' % res)
return res

get_result.py

from celery_task.celery import app

from celery.result import AsyncResult

id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
if __name__ == '__main__':
async = AsyncResult(id=id, app=app)
if async.successful():
result = async.get()
print(result)
elif async.failed():
print('任务失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')

十、django中使用(更新轮播图案例)

最终达到的效果:根据定时任务来更新redis中的缓存。用户获取资源都是从redis缓存中获取。避免了数据库的压力

redis的配置

dev.py

# 缓存redis数据库配置
CACHES = {
"default": {
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": "redis://127.0.0.1:6379/10",
"OPTIONS": {
"CLIENT_CLASS": "django_redis.client.DefaultClient",
"CONNECTION_POOL_KWARGS": {"max_connections": 100}, # 同时的并发量
"DECODE_RESPONSES": True,
"PASSWORD": "123",
}
}
}

接口缓存

"""
1)什么是接口的后台缓存
前台访问后台接口,后台会优先从缓存(内存)中查找接口数据
如果有数据,直接对前台响应缓存数据
如果没有数据,与(mysql)数据库交互,得到数据,对前台响应,同时将数据进行缓存,以备下次使用
了解:前台缓存 - 前台在请求到接口数据后,在前台建立缓存,再发送同样请求时,发现前台缓存有数据,就不再对后台做请求了
2)什么的接口会进行接口缓存
i)接口会被大量访问:比如主页中的接口,几乎所有人都会访问,而且会重复访问
ii)在一定时间内数据不会变化(或数据不变化)的接口
iii)接口数据的时效性不是特别强(数据库数据发生变化了,不是立即同步给前台,验后时间同步给前台也没事)
注:理论上所有接口都可以建立缓存,只要数据库与缓存数据同步及时
3)如何实现接口缓存:主页轮播图接口
"""

views.py

from rest_framework.viewsets import ModelViewSet
from rest_framework import mixins
from . import models, serializers
from django.conf import settings
from rest_framework.response import Response

from django.core.cache import cache
class BannerViewSet(ModelViewSet, mixins.ListModelMixin):
queryset = models.Banner.objects.filter(is_delete=False, is_show=True).order_by('-orders')[:settings.BANNER_COUNT]
serializer_class = serializers.BannerSerializer

# 有缓存走缓存,没有缓存走数据库,然后同步给缓存。接口自己实现
def list(self, request, *args, **kwargs):
banner_list = cache.get('banner_list')

if not banner_list:
print('走了数据库')
response = self.list(request, *args, **kwargs)
banner_list = response.data
cache.set('banner_list', banner_list, 86400) # 存进缓存中,缓存配置了redis数据库

return Response(banner_list)

1aa6864bfc728d33e99126fa5c6932c6.png

启动服务

'''
1):先切换到celery_task所在的同级目录(一般为根目录下)
2):开一个终端(启动服务): celery worker -A celery_task -l info -P eventlet
3):再开一个终端(添加任务): celery beat -A celery_task -l info
'''
# 注):-A 表示相对路径,所以一定先进入celery_task所在包
-l 表示打印到日志 info 级别

celery.py

"""
celery框架django项目工作流程
1)加载django配置环境
2)创建Celery框架对象app,配置broker和backend,得到的app就是worker
3)给worker对应的app添加可处理的任务函数,用include配置给worker的app
4)完成提供的任务的定时配置app.conf.beat_schedule
5)启动celery服务,运行worker,执行任务
6)启动beat服务,运行beat,添加任务
重点:由于采用了django的反射机制,使用celery.py所在的celery_task包必须放置项目的根目录下
"""

# 一、加载django配置环境
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")

# 二、加载celery配置环境
from celery import Celery
broker = 'redis://:123@127.0.0.1:6379/1'
backend = 'redis://:123@127.0.0.1:6379/2'
# worker
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks']) # 外面的包名和文件名,一般都是固定


# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
# 定时任务名字
'update_banner_cache': {
'task': 'celery_task.tasks.update_banner_list',
'args': (),
'schedule': timedelta(seconds=10), # 3秒一次
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
# 'schedule': crontab(minute=0, day_of_week=1), # 每周一早八点
}
}
'''
minute : 分钟
hour :小时
day_of_week :礼拜
day_of_month:月
month_of_year:年
'''

'''
fall_task:任务名自定义
task:任务来源
args:任务参数
schedule:定时时间(秒)
'''

tasks.py

from .celery import app

from django.core.cache import cache
from home import models, serializers
from django.conf import settings
@app.task
def update_banner_list():
queryset = models.Banner.objects.filter(is_delete=False, is_show=True).order_by('-orders')[:settings.BANNER_COUNT]
banner_list = serializers.BannerSerializer(queryset, many=True).data
# 拿不到request对象,所以头像的连接base_url要自己组装
for banner in banner_list:
banner['image'] = 'http://127.0.0.1:8000%s' % banner['image']

cache.set('banner_list', banner_list, 86400)
return True

选择了IT,必定终身学习

作者:Jeff

出处:http://dwz.date/aNfM

图片和内容源自网络分享,若有侵权,请联系删除!

上海艾磊科技有限公司专门为企业提供IT咨询,IT外包,系统集成,以及各类IT增值服务。其中增值服务包括OFFICE 365云服务,鼎捷企业ERP管理软件,云备份,企业邮箱,无线覆盖,上网行为管理,VPN架设,网络安全服务,INTERNET接入,设备租赁, IP电话服务

89383826dec0bec3238df359baed2c21.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/542155.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

关于linux mv指令机制

最近在mv文件的时候,操作失误将生产服务器一个1TB的文件夹mv到了/opt/test目录,因为最后/opt/目录被沾满所以1TB的文件夹没有迁移过来,写入了30GB数据到了/opt/test目录,因为系统分区被沾满,所以把test目录给删除了。 …

数据库的管理

1. 数据库的简介 定义:数据库(Database)就是一种按数据结构来组织,存储和管理数据的仓库,其中包含数据挖掘,大数据信息的推送。 mariadb数据库管理系统是mysql的一个分支,主要由开源社区在维护&…

C#中的Dictionary字典类介绍(转载)

C#中的Dictionary字典类介绍 关键字:C# Dictionary 字典 作者:txw1958原文:http://www.cnblogs.com/txw1958/archive/2012/11/07/csharp-dictionary.html 说明 必须包含名空间System.Collection.Generic Dictionary里面的每一个元素都…

求阶乘的第一个非零数字_查找数字阶乘中的尾随零

求阶乘的第一个非零数字Problem statement: 问题陈述: Find the number of trailing zeros in n! (Where, n is the given input). 在n中找到尾随零的数目! (其中, n是给定的输入)。 Solution: 解: Computing a factorial is o…

高速缓存dns

1. DNS: Domain Name System,域名系统。 万维网上作为域名和IP地址相互映射的一个分布式数据库,能够使用户更方便的访问互联网。他主要负责把域名和IP的相互转换,DNS运行与TCP|UDP的53端口上。 2. 高速缓存DNS:DNS服务…

python log日志级别_python – 日志记录:如何为处理程序设置最大日志级别

您可以向文件处理程序添加过滤器.这样,您可以将特定级别重定向到不同的文件.import loggingclass LevelFilter(logging.Filter):def __init__(self, low, high):self._low lowself._high highlogging.Filter.__init__(self)def filter(self, record):if self._low < recor…

Python Pandas –合并,联接和串联

There are three main ways to combine dataFrames i.e., merging, joining and concatenating. The following examples will illustrate merging, joining and concatenation. 组合dataFrames的主要方法有三种&#xff0c;即合并&#xff0c;联接和串联 。 以下示例将说明合并…

Apache服务配置

1. apache 企业中常用的web服务。用来提供http&#xff1a;//&#xff08;超文本传输协议&#xff09; 基础信息&#xff1a; 主配置目录&#xff1a; /etc/httpd/conf 主配置文件&#xff1a; /etc/httpd/conf/httpd.conf 子配置目录&#xff1a; /etc/httpd/conf.d/ 子配置文…

git 怎么查看合并过来哪些代码_git整理纷乱的历史合并记录

https://github.com/Epix37/Hearthstone-Deck-Tracker以上面版本库的master分支为例父节点1SHA-1: a21142968282ae49720cf30a0f18290b2ce74b3a* remove hotkey from config if action could not be found, fix hotkey menu item name父节点2SHA-1: 86a824e8f46005db91f334dfc57…

如何安装Genymotion虚拟机以及Genmotion的eclipse插件

---内容开始--- - 首先去genymotion的官网去下载其安装文件 资源下载 Genymotion官网必须注册一个账号这个账号安装之后还有用的&#xff0c;用户名最好用网易126邮箱注册----我下载的是2.8.0的版本(注&#xff1a;注册前先开个代理服务器不然页面打不开下载时最好用迅雷下载这…

java system类_Java System类mapLibraryName()方法及示例

java system类系统类mapLibraryName()方法 (System class mapLibraryName() method) mapLibraryName() method is available in java.lang package. mapLibraryName()方法在java.lang包中可用。 mapLibraryName() method is used to map a given library name into a platform-…

squid服务配置(正向、反向代理)

代理&#xff1a; 就是代理网络用户去取得网络信息。 Squid是一种用来缓冲Internet数据的软件。安装Squid服务实现代理缓存服务器功能。 正向代理&#xff1a;意思是一个位于客户端和原始服务器之间的服务器&#xff0c;为了从原始服务器取得内容&#xff0c;客户端向代理发送一…

家谱整站源码php_mysql家谱表查询某人所有后代

CREATE TABLE people (id INT(11) NOT NULL,name VARCHAR(50) NULL DEFAULT NULL,pid INT(11) NOT NULL DEFAULT 0,PRIMARY KEY (id));CREATE DEFINERroot% PROCEDURE getChildren(IN parentId INT)LANGUAGE SQLNOT DETERMINISTICCONTAINS SQLSQL SECURITY DEFINERCOMMENT 获取…

React 入门学习笔记2

摘自阮一峰&#xff1a;React入门实例教程&#xff0c;转载请注明出处。 一、获取真实的DOM节点 组件并不是真实的 DOM 节点&#xff0c;而是存在于内存之中的一种数据结构&#xff0c;叫做虚拟 DOM &#xff08;virtual DOM&#xff09;。只有当它插入文档以后&#xff0c;才会…

c语言getchar函数_C语言中带有示例的getchar()函数

c语言getchar函数C语言中的getchar()函数 (getchar() function in C) The getchar() function is defined in the <stdio.h> header file. getchar()函数在<stdio.h>头文件中定义。 Prototype: 原型&#xff1a; int getchar(void);Parameters: FILE *filename(f…

python及pycharm

1.python简介&#xff1a; Python是一种计算机程序设计语言。是一种动态的、面向对象的脚本语言&#xff0c;最初被设计用于编写自动化脚本(shell)&#xff0c;随着版本的不断更新和语言新功能的添加&#xff0c;越来越多被用于独立的、大型项目的开发。 python最重要的功能&am…

anaconda如何更改环境配置_手把手教新手安装Anaconda配置开发环境

Anaconda是针对Python的集成环境&#xff0c;它已经成为全球数千万数据科学从业人员必备的开发工具&#xff0c;帮助人们有效地解决数据科学和机器学习相关地问题。如果你想从事数据科学和机器学习的工作&#xff0c;可以从本文开始&#xff0c;了解一下如何安装Anaconda。1. 初…

详解摘要认证

1. 什么是摘要认证摘要认证与基础认证的工作原理很相似&#xff0c;用户先发出一个没有认证证书的请求&#xff0c;Web服务器回复一个带有WWW-Authenticate头的响应&#xff0c;指明访问所请求的资源需要证书。但是和基础认证发送以Base 64编码的用户名和密码不同&#xff0c;在…

Python的基础知识

1.注释&#xff1a; #单行注释ctrl / 批量注释&#xff0c;选中需要注释的所有行ctrl / 批量取消注释&#xff0c;选中已经被注释的所有行 块注释&#xff1a;上下各三个双引号的部分全部被注释 “”“ hello haha ”“”2.变量&#xff1a; 变量命名的规则&#xff1a; …

树莓派该文件名_树莓派:文本编辑器与文件

GNU nano是Unix系统下一款常用的文本编辑器&#xff0c;以简单易用著称。与之相比&#xff0c;功能更强大的Vi和Emacs编辑器&#xff0c;学习曲线比nano陡峭很多。由于nano对于一般的文本编辑来说已经足够&#xff0c;所以我想简单介绍一下&#xff0c;以便于更好入门。基本使用…