Celery入门

Celery 官网:Celery - Distributed Task Queue — Celery 5.3.6 documentation

Celery 官方文档英文版:Celery - Distributed Task Queue — Celery 5.4.0rc1 documentation

Celery 官方文档中文版:Celery - 分布式任务队列 — Celery 3.1.7 文档

Celery异步任务框架

"""
1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
2)celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求人是一个独立运行的服务 | 医院也是一个独立运行的服务正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
"""

Celery架构

Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。

image-20230301162038433

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

使用场景

异步执行:解决耗时任务

延迟执行:解决延迟任务

定时执行:解决周期(周期)任务

Celery的安装配置

pip install celery

消息中间件:RabbitMQ/Redis

app=Celery('任务名', broker='xxx', backend='xxx')

两种celery任务结构:提倡用包管理,结构更清晰

# 如果 Celery对象:Celery(...) 是放在一个模块下的
# 1)终端切换到该模块所在文件夹位置:scripts
# 2)执行启动worker的命令:celery worker -A 模块名 -l info -P eventlet
# 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info
# 注:模块名随意# 如果 Celery对象:Celery(...) 是放在一个包下的
# 1)必须在这个包下建一个celery.py的文件,将Celery(...)产生对象的语句放在该文件中
# 2)执行启动worker的命令:celery worker -A 包名 -l info -P eventlet
# 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info
# 注:包名随意

Celery执行异步任务

包架构封装
project├── celery_task  	# celery包│   ├── __init__.py # 包文件│   ├── celery.py   # celery连接和配置相关文件,且名字必须交celery.py│   └── tasks.py    # 所有任务函数├── add_task.py  	# 添加任务└── get_result.py   # 获取结果

基本使用

celery.py
# 1)创建app + 任务# 2)启动celery(app)服务:
# 非windows
# 命令:celery worker -A celery_task -l info
# windows:
# pip3 install eventlet
# celery worker -A celery_task -l info -P eventlet# 3)添加任务:手动添加,要自定义添加任务的脚本,右键执行脚本# 4)获取结果:手动获取,要自定义获取任务的脚本,右键执行脚本from celery import Celery
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
tasks.py
from .celery import app
import time
@app.task
def add(n, m):print(n)print(m)time.sleep(10)print('n+m的结果:%s' % (n + m))return n + m@app.task
def low(n, m):print(n)print(m)print('n-m的结果:%s' % (n - m))return n - m
add_task.py
from celery_task import tasks# 添加立即执行任务
t1 = tasks.add.delay(10, 20)
t2 = tasks.low.delay(100, 50)
print(t1.id)# 添加延迟任务
from datetime import datetime, timedelta
eta=datetime.utcnow() + timedelta(seconds=10)
tasks.low.apply_async(args=(200, 50), eta=eta)
get_result.py
from celery_task.celery import appfrom celery.result import AsyncResultid = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
if __name__ == '__main__':async = AsyncResult(id=id, app=app)if async.successful():result = async.get()print(result)elif async.failed():print('任务失败')elif async.status == 'PENDING':print('任务等待中被执行')elif async.status == 'RETRY':print('任务异常后正在重试')elif async.status == 'STARTED':print('任务已经开始被执行')

高级使用

celery.py
# 1)创建app + 任务# 2)启动celery(app)服务:
# 非windows
# 命令:celery worker -A celery_task -l info
# windows:
# pip3 install eventlet
# celery worker -A celery_task -l info -P eventlet# 3)添加任务:自动添加任务,所以要启动一个添加任务的服务
# 命令:celery beat -A celery_task -l info# 4)获取结果from celery import Celerybroker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {'low-task': {'task': 'celery_task.tasks.low','schedule': timedelta(seconds=3),# 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点'args': (300, 150),}
}
tasks.py
from .celery import appimport time
@app.task
def add(n, m):print(n)print(m)time.sleep(10)print('n+m的结果:%s' % (n + m))return n + m@app.task
def low(n, m):print(n)print(m)print('n-m的结果:%s' % (n - m))return n - m
get_result.py
from celery_task.celery import appfrom celery.result import AsyncResultid = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
if __name__ == '__main__':async = AsyncResult(id=id, app=app)if async.successful():result = async.get()print(result)elif async.failed():print('任务失败')elif async.status == 'PENDING':print('任务等待中被执行')elif async.status == 'RETRY':print('任务异常后正在重试')elif async.status == 'STARTED':print('任务已经开始被执行')

django中使用

celery.py
"""
celery框架django项目工作流程
1)加载django配置环境
2)创建Celery框架对象app,配置broker和backend,得到的app就是worker
3)给worker对应的app添加可处理的任务函数,用include配置给worker的app
4)完成提供的任务的定时配置app.conf.beat_schedule
5)启动celery服务,运行worker,执行任务
6)启动beat服务,运行beat,添加任务重点:由于采用了django的反射机制,使用celery.py所在的celery_task包必须放置项目的根目录下
"""# 一、加载django配置环境
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")# 二、加载celery配置环境
from celery import Celery
# broker
broker = 'redis://127.0.0.1:6379/0'
# backend
backend = 'redis://127.0.0.1:6379/1'
# worker
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {'update-banner-list': {'task': 'celery_task.tasks.update_banner_list','schedule': timedelta(seconds=10),'args': (),}
}
tasks.py
from .celery import appfrom django.core.cache import cache
from home import models, serializers
from django.conf import settings
@app.task
def update_banner_list():queryset = models.Banner.objects.filter(is_delete=False, is_show=True).order_by('-orders')[:settings.BANNER_COUNT]banner_list = serializers.BannerSerializer(queryset, many=True).data# 拿不到request对象,所以头像的连接base_url要自己组装for banner in banner_list:banner['image'] = 'http://127.0.0.1:8000%s' % banner['image']cache.set('banner_list', banner_list, 86400)return True

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/656142.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux网络编程——网络初识

文章目录 1. 网络协议初识1.1 为什么要有网络协议1.2 协议分层 2. OSI七层模型3. TCP/IP五层(或四层)模型4. 网络传输基本流程5. 以太网通信 1. 网络协议初识 1.1 为什么要有网络协议 早期计算机是独立的,如果要进行数据交互,就…

超越人类上限的策划:百度输入法在候选词区域植入广告

一位 V2EX 用户最新发帖称,百度输入法的最新版本中引入了一个新功能,将广告直接植入到候选词区域。 具体表现为,当用户输入某些关键词时,候选词区域会显示与输入内容相关的广告链接。例如,用户输入“招商”时&#xf…

《统计学习方法:李航》笔记 从原理到实现(基于python)-- 第5章 决策树(代码python实践)

文章目录 第5章 决策树—python 实践书上题目5.1利用ID3算法生成决策树,例5.3scikit-learn实例《统计学习方法:李航》笔记 从原理到实现(基于python)-- 第5章 决策树 第5章 决策树—python 实践 import numpy as np import pandas as pd import matplotlib.pyplot as plt …

能源巨头施耐德电气遭遇勒索软件攻击

Bleeping Computer 网站消息,媒体透露能源管理和自动化巨头施耐德电气公司近期遭到 Cactus 勒索软件攻击,导致公司大量数据被盗。 施耐德电气是一家法国跨国公司,主要生产能源和自动化产品,从大卖场的家用电气元件到企业级工业控制…

后序遍历的线索化二叉树

对于后序遍历,需要明确,往往叶子结点,只能指向右子树(如果右子树存在的情况),或者指向该结点(因为这才是后序遍历),同样在进行退出到前一次递归的时候,我们要…

怎么控制Element的数据树形表格展开所有行;递归操作,打造万能数据表格折叠。

HTML <el-button type"success" size"small" click"expandStatusFun"> <span v-show"expandStatusfalse"><i class"el-icon-folder-opened"></i>展开全部</span><span v-show"expan…

如何使用Python+Flask搭建本地Web站点并结合内网穿透公网访问?

文章目录 前言1. 安装部署Flask并制作SayHello问答界面2. 安装Cpolar内网穿透3. 配置Flask的问答界面公网访问地址4. 公网远程访问Flask的问答界面 前言 Flask是一个Python编写的Web微框架&#xff0c;让我们可以使用Python语言快速实现一个网站或Web服务&#xff0c;本期教程…

OpenAI发布新模型!ChatGPT性能重磅提升,API大幅降价,GPT-4 「变懒」被修复

OpenAI 对ChatGPT进行了大更新&#xff1a;推出了新一代的嵌入模型&#xff0c;对GPT-4 Turbo模型进行了更新&#xff0c;并将很快对GPT-3.5 Turbo的API进行大幅降价&#xff0c;GPT-4「变懒」行为也被修复。 接下来二狗就带大家看看ChatGPT的这次详细更新。 推出新的嵌入模型…

电脑护眼模式怎么设置?4个有效方法保护眼睛!

“我感觉每天使用电脑的时间久了&#xff0c;眼睛总是不太舒服。电脑护眼模式怎么设置呢&#xff1f;有什么比较好用的方法可以推荐吗&#xff1f;” 如果长时间使用电脑&#xff0c;或许会让我们感到用眼疲劳。电脑护眼模式是现代人常用的电脑设置之一&#xff0c;它能有效地减…

大数据学习之Redis,十大数据类型的具体应用(一)

目录 3. 数据类型命令及落地应用 3.1 备注 3.2 Redis字符串&#xff08;String&#xff09; 单值单value 多值操作 获取指定区间范围内的值 数值增减 获取字符串长度和内容追加 分布式锁 getset(先get后set) 3.3 Redis列表&#xff08;List&#xff09; 简单说明 …

switch-case的简单使用

签名&#xff1a;但行好事&#xff0c;莫问前程。 文章目录 前言一、switch二、case三、break四、default总结 前言 记录一下switch-case的简单使用。 一、switch switch中的表达式只能是特定的数据类型。如下&#xff1a; byteshortcharint枚举&#xff08;jdk5.0&#xff…

Java API 操作 HDFS

Java API 操作HDFS一般有两种方式&#xff1a; 使用HDFS客户端配置文件自动配置 Java 代码中配置 一 使用HDFS客户端配置 1.1 下载HDFS客户端配置 1.2 创建Maven项目 创建Maven项目&#xff0c;将下载的客户端配置文件 core-site.xml、hdfs-site.xml 放入resources目录下&…

echart 完整例子

<!--集团用电数据柱状图--> <template><div class"scsj-wsd"><div class"type-btns"><divclass"btns-item":class"currType 0 ? active : "click"change(0)">年</div><divclass&q…

华为OD-华为机试精讲500篇系列文章目录介绍(持续补充ing)

目录 背景介绍 什么是华为OD&#xff1f; OD现状 OD趋势 华为OD机考刷题攻略 1、刷题资料&#xff1a;投递岗位通过筛选后提供 2、注意事项&#xff1a; 真题代码目录 背景介绍 经济下行的这几年&#xff0c;每个人都感同身受&#xff0c;如何让自己在芸芸众生中脱颖而…

【高效开发工具系列】Wolfram Alpha

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

Unity3d Cinemachine篇(三)— FreeLook

文章目录 前言一、使用FreeLook制造第三人称跟随效果1. 创建一个游戏物体2. 创建FreeLook相机4. 完成 前言 上一期我们简单的使用了Dolly CamerawithTrack相机&#xff0c;这次我们来使用一下FreeLook 一、使用FreeLook制造第三人称跟随效果 1. 创建一个游戏物体 游戏物体比较…

(十)springboot实战——springboot3下的webflux项目mysql数据库事务处理

前言 WebFlux 是 Spring Framework 5.0 中引入的一种新型反应式编程模型&#xff0c;支持非阻塞 I/O&#xff0c;适用于高并发、高吞吐量的应用程序。在 WebFlux 应用程序中使用事务需要注意以下几点。使用 Reactive R2DBC&#xff1a;WebFlux 支持使用 Reactive R2DBC 访问关…

WebService的services.xml问题

WebService有多种实现方式&#xff0c;这里使用的是axis2 问题&#xff1a; 在本地开发&#xff0c;访问本地的http://localhost:8080/services/ims?wsdl&#xff0c;正常访问 但是打成jar包&#xff0c;不管是linux还是window启动&#xff0c;都访问不到&#xff0c;报错…

金线检测步骤

半导体行业,金线检测是必不可以少的一个检测项,除了焊点,die面,手指以外的必检项目. 重难点在于金线的提取,算法多种多样,找到适合才是关键,涉及到打光,图像处理,这里不做深入分析,软件和硬件配合好才能做的最好. 经典算法Block分析,结合图像检测. 高斯算法提取 边缘检测算法提…

空间域:空间组学的耶路撒冷

文章目录 环境配置与数据SquidpySpaGCN将基因表达和组织学整合到一个图上基因表达数据质控与预处理SpaGCN的超参优化空间域 参考文献 空间组学不能没有空间域&#xff0c;就如同蛋白质不能没有结构域。 摘要&#xff1a; 空间域是反映细胞在基因表达方面的相似性以及空间邻近性…