Celery入门

Celery 官网:Celery - Distributed Task Queue — Celery 5.3.6 documentation

Celery 官方文档英文版:Celery - Distributed Task Queue — Celery 5.4.0rc1 documentation

Celery 官方文档中文版:Celery - 分布式任务队列 — Celery 3.1.7 文档

Celery异步任务框架

"""
1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
2)celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求人是一个独立运行的服务 | 医院也是一个独立运行的服务正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
"""

Celery架构

Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。

image-20230301162038433

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

使用场景

异步执行:解决耗时任务

延迟执行:解决延迟任务

定时执行:解决周期(周期)任务

Celery的安装配置

pip install celery

消息中间件:RabbitMQ/Redis

app=Celery('任务名', broker='xxx', backend='xxx')

两种celery任务结构:提倡用包管理,结构更清晰

# 如果 Celery对象:Celery(...) 是放在一个模块下的
# 1)终端切换到该模块所在文件夹位置:scripts
# 2)执行启动worker的命令:celery worker -A 模块名 -l info -P eventlet
# 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info
# 注:模块名随意# 如果 Celery对象:Celery(...) 是放在一个包下的
# 1)必须在这个包下建一个celery.py的文件,将Celery(...)产生对象的语句放在该文件中
# 2)执行启动worker的命令:celery worker -A 包名 -l info -P eventlet
# 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info
# 注:包名随意

Celery执行异步任务

包架构封装
project├── celery_task  	# celery包│   ├── __init__.py # 包文件│   ├── celery.py   # celery连接和配置相关文件,且名字必须交celery.py│   └── tasks.py    # 所有任务函数├── add_task.py  	# 添加任务└── get_result.py   # 获取结果

基本使用

celery.py
# 1)创建app + 任务# 2)启动celery(app)服务:
# 非windows
# 命令:celery worker -A celery_task -l info
# windows:
# pip3 install eventlet
# celery worker -A celery_task -l info -P eventlet# 3)添加任务:手动添加,要自定义添加任务的脚本,右键执行脚本# 4)获取结果:手动获取,要自定义获取任务的脚本,右键执行脚本from celery import Celery
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
tasks.py
from .celery import app
import time
@app.task
def add(n, m):print(n)print(m)time.sleep(10)print('n+m的结果:%s' % (n + m))return n + m@app.task
def low(n, m):print(n)print(m)print('n-m的结果:%s' % (n - m))return n - m
add_task.py
from celery_task import tasks# 添加立即执行任务
t1 = tasks.add.delay(10, 20)
t2 = tasks.low.delay(100, 50)
print(t1.id)# 添加延迟任务
from datetime import datetime, timedelta
eta=datetime.utcnow() + timedelta(seconds=10)
tasks.low.apply_async(args=(200, 50), eta=eta)
get_result.py
from celery_task.celery import appfrom celery.result import AsyncResultid = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
if __name__ == '__main__':async = AsyncResult(id=id, app=app)if async.successful():result = async.get()print(result)elif async.failed():print('任务失败')elif async.status == 'PENDING':print('任务等待中被执行')elif async.status == 'RETRY':print('任务异常后正在重试')elif async.status == 'STARTED':print('任务已经开始被执行')

高级使用

celery.py
# 1)创建app + 任务# 2)启动celery(app)服务:
# 非windows
# 命令:celery worker -A celery_task -l info
# windows:
# pip3 install eventlet
# celery worker -A celery_task -l info -P eventlet# 3)添加任务:自动添加任务,所以要启动一个添加任务的服务
# 命令:celery beat -A celery_task -l info# 4)获取结果from celery import Celerybroker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {'low-task': {'task': 'celery_task.tasks.low','schedule': timedelta(seconds=3),# 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点'args': (300, 150),}
}
tasks.py
from .celery import appimport time
@app.task
def add(n, m):print(n)print(m)time.sleep(10)print('n+m的结果:%s' % (n + m))return n + m@app.task
def low(n, m):print(n)print(m)print('n-m的结果:%s' % (n - m))return n - m
get_result.py
from celery_task.celery import appfrom celery.result import AsyncResultid = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
if __name__ == '__main__':async = AsyncResult(id=id, app=app)if async.successful():result = async.get()print(result)elif async.failed():print('任务失败')elif async.status == 'PENDING':print('任务等待中被执行')elif async.status == 'RETRY':print('任务异常后正在重试')elif async.status == 'STARTED':print('任务已经开始被执行')

django中使用

celery.py
"""
celery框架django项目工作流程
1)加载django配置环境
2)创建Celery框架对象app,配置broker和backend,得到的app就是worker
3)给worker对应的app添加可处理的任务函数,用include配置给worker的app
4)完成提供的任务的定时配置app.conf.beat_schedule
5)启动celery服务,运行worker,执行任务
6)启动beat服务,运行beat,添加任务重点:由于采用了django的反射机制,使用celery.py所在的celery_task包必须放置项目的根目录下
"""# 一、加载django配置环境
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")# 二、加载celery配置环境
from celery import Celery
# broker
broker = 'redis://127.0.0.1:6379/0'
# backend
backend = 'redis://127.0.0.1:6379/1'
# worker
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {'update-banner-list': {'task': 'celery_task.tasks.update_banner_list','schedule': timedelta(seconds=10),'args': (),}
}
tasks.py
from .celery import appfrom django.core.cache import cache
from home import models, serializers
from django.conf import settings
@app.task
def update_banner_list():queryset = models.Banner.objects.filter(is_delete=False, is_show=True).order_by('-orders')[:settings.BANNER_COUNT]banner_list = serializers.BannerSerializer(queryset, many=True).data# 拿不到request对象,所以头像的连接base_url要自己组装for banner in banner_list:banner['image'] = 'http://127.0.0.1:8000%s' % banner['image']cache.set('banner_list', banner_list, 86400)return True

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/656142.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AUTOSAR内存篇 -内存访问(MemAcc)

文章目录 功能介绍关键特性功能元素内存地址转换内存映射限制内存访问协调Job管理Job处理Job状态Job结果硬件特定的服务Burst模式支持通用锁机制动态内存驱动处理动态内存驱动激活服务调用模块处理

chronyd

chronyd https://chrony-project.org/doc/4.1/chrony.conf.html 无时钟源配置可参考 : Isolated networks 通过互联网同步的两台机器之间的典型精度在几毫秒内; 在 LAN 上,精度通常为数十微秒。 使用硬件时间戳或硬件参考时钟,亚微秒精度…

地图不仅引路:深探Java中Map接口的藏宝图

在Java编程中,处理键值对数据结构的需求十分普遍。Java集合框架(Java Collections Framework)提供了一个强大的接口Map,专门用来存储和操作一组键值对。本文将带你深入理解Java中的Map接口,包括它的工作原理、常用实现…

Linux网络编程——网络初识

文章目录 1. 网络协议初识1.1 为什么要有网络协议1.2 协议分层 2. OSI七层模型3. TCP/IP五层(或四层)模型4. 网络传输基本流程5. 以太网通信 1. 网络协议初识 1.1 为什么要有网络协议 早期计算机是独立的,如果要进行数据交互,就…

Jtti:怎么使用shell脚本查询数据库输出文件

在使用 Shell 脚本查询数据库并输出结果到文件时,通常会使用 sqlcmd(对于 Microsoft SQL Server)或 mysql(对于 MySQL)等命令行工具。下面是一个简单的示例,演示如何使用 Shell 脚本查询数据库并将结果输出…

超越人类上限的策划:百度输入法在候选词区域植入广告

一位 V2EX 用户最新发帖称,百度输入法的最新版本中引入了一个新功能,将广告直接植入到候选词区域。 具体表现为,当用户输入某些关键词时,候选词区域会显示与输入内容相关的广告链接。例如,用户输入“招商”时&#xf…

《统计学习方法:李航》笔记 从原理到实现(基于python)-- 第5章 决策树(代码python实践)

文章目录 第5章 决策树—python 实践书上题目5.1利用ID3算法生成决策树,例5.3scikit-learn实例《统计学习方法:李航》笔记 从原理到实现(基于python)-- 第5章 决策树 第5章 决策树—python 实践 import numpy as np import pandas as pd import matplotlib.pyplot as plt …

手动throw异常对象

手动throw异常对象 1.为什么需要手动抛出异常对象?2.如何理解"自动 vs 手动"抛出异常对象?3.如何实现手动抛出异常?4.注意点:throw后的代码不难执行,编译不通过。5.面试题:throw和throws的区别&a…

能源巨头施耐德电气遭遇勒索软件攻击

Bleeping Computer 网站消息,媒体透露能源管理和自动化巨头施耐德电气公司近期遭到 Cactus 勒索软件攻击,导致公司大量数据被盗。 施耐德电气是一家法国跨国公司,主要生产能源和自动化产品,从大卖场的家用电气元件到企业级工业控制…

后序遍历的线索化二叉树

对于后序遍历,需要明确,往往叶子结点,只能指向右子树(如果右子树存在的情况),或者指向该结点(因为这才是后序遍历),同样在进行退出到前一次递归的时候,我们要…

怎么控制Element的数据树形表格展开所有行;递归操作,打造万能数据表格折叠。

HTML <el-button type"success" size"small" click"expandStatusFun"> <span v-show"expandStatusfalse"><i class"el-icon-folder-opened"></i>展开全部</span><span v-show"expan…

THM学习笔记——Nmap

Nmap是一款用于网络发现和安全审计的网络安全工具&#xff0c;通常情况下&#xff0c;Nmap用于&#xff1a; 列举网络主机清单 管理服务升级调度 监控主机 服务运行状况 Nmap可以检测目标主机是否在线、端口开放情况、侦测运行的服务类型及版本信息、侦测操作系统与设备类…

如何使用Python+Flask搭建本地Web站点并结合内网穿透公网访问?

文章目录 前言1. 安装部署Flask并制作SayHello问答界面2. 安装Cpolar内网穿透3. 配置Flask的问答界面公网访问地址4. 公网远程访问Flask的问答界面 前言 Flask是一个Python编写的Web微框架&#xff0c;让我们可以使用Python语言快速实现一个网站或Web服务&#xff0c;本期教程…

将Vue2中的console.log调试信息移除

前端项目构建生产环境下的package时&#xff0c;咱们肯定要去掉development环境下的console.log&#xff0c;如果挨个注释可就太费劲了&#xff0c;本文介绍怎么使用 babel-plugin-transform-remove-console 移除前端项目中所有的console.log. 1. 安装依赖 npm install babel-…

OpenAI发布新模型!ChatGPT性能重磅提升,API大幅降价,GPT-4 「变懒」被修复

OpenAI 对ChatGPT进行了大更新&#xff1a;推出了新一代的嵌入模型&#xff0c;对GPT-4 Turbo模型进行了更新&#xff0c;并将很快对GPT-3.5 Turbo的API进行大幅降价&#xff0c;GPT-4「变懒」行为也被修复。 接下来二狗就带大家看看ChatGPT的这次详细更新。 推出新的嵌入模型…

2024年华为OD机试真题-API集群负载统计-Python-OD统一考试(C卷)

题目描述: 某个产品的RESTful API集合部署在服务器集群的多个节点上,近期对客户端访问日志进行了采集,需要统计各个API的访问频次,根据热点信息在服务器节点之间做负载均衡,现在需要实现热点信息统计查询功能。 RESTful API的由多个层级构成,层级之间使用 / 连接,如 /A/…

电脑护眼模式怎么设置?4个有效方法保护眼睛!

“我感觉每天使用电脑的时间久了&#xff0c;眼睛总是不太舒服。电脑护眼模式怎么设置呢&#xff1f;有什么比较好用的方法可以推荐吗&#xff1f;” 如果长时间使用电脑&#xff0c;或许会让我们感到用眼疲劳。电脑护眼模式是现代人常用的电脑设置之一&#xff0c;它能有效地减…

分布式ID(4):雪花算法生成ID之Leaf(美团点评分布式ID生成系统)

1 Leaf官方地址 Leaf源码地址: https://github.com/Meituan-Dianping/Leaf Leaf官方说明文档地址: https://tech.meituan.com/2019/03/07/open-source-project-leaf.htmlhttps://github.com/Meituan-Dianping/Leaf/blob/master/README_CN.md 这边只做简单介绍,详细说明…

大数据学习之Redis,十大数据类型的具体应用(一)

目录 3. 数据类型命令及落地应用 3.1 备注 3.2 Redis字符串&#xff08;String&#xff09; 单值单value 多值操作 获取指定区间范围内的值 数值增减 获取字符串长度和内容追加 分布式锁 getset(先get后set) 3.3 Redis列表&#xff08;List&#xff09; 简单说明 …

switch-case的简单使用

签名&#xff1a;但行好事&#xff0c;莫问前程。 文章目录 前言一、switch二、case三、break四、default总结 前言 记录一下switch-case的简单使用。 一、switch switch中的表达式只能是特定的数据类型。如下&#xff1a; byteshortcharint枚举&#xff08;jdk5.0&#xff…