分布式异步任务框架celery

Celery介绍

github地址:GitHub - celery/celery: Distributed Task Queue (development branch)

文档地址:Celery - Distributed Task Queue — Celery 5.3.6 documentation

1.1 Celery是什么

celery时一个灵活且可靠的处理大量消息的分布式系统,可以在多个节点之间处理某个任务

celery时一个专注于实时处理的任务队列,支持任务调度

celery是开源的,有很多的使用者

celery完全基于python语言编写的

celery本质上是一个【分布式的异步任务调度框架】,类似于Apache的airflow

celery只是用来调度任务的,但是它本身并不具备存储任务的功能,而调度任务的时候肯定是要把任务存起来,因此要使用celery的话,还需要搭配一些具有存储、访问功能的工具,比如:消息队列、Redis缓存、数据库等。官方推荐是消息队列RabbitMQ,我们使用Redis

同步调用函数 --》add--》执行5s钟--》数据返回了

异步调用函数--》add--》执行5s钟--》执行完的数据,找个地方存起来

调用方--》去存的地方看一下--》任务有没有执行完

1.2 应用场景

1)异步任务

 视频转码、邮件发送、消息推送等一些耗时操作

2)定时任务

定时推送消息、定时爬取一些数据、定时统计一些数据

3)延时任务

提交任务后,等一段时间再执行任务

1.3 celery架构

celery架构,它采用典型的生产者-消费者模式,主要由以下部分组成

生产者生产---消费者进行消费

producer:它负责把任务提交到broker钟

celery Beat:会读取文件、周期性的向broker中提交任务

broker:消息中间件,放任务的地点,celery本身不提供,借助redis等消息队列

worker:工人、消费者,负责从消息中间件中取出任务--》执行

backend:worker执行完,会有结果,结果存储再backend,celery不提供,借助redis等消息队列。

Celery使用

2.1 安装

pip install celery

使用redis作为消息队列

pip install redis

如果是Windows系统还需要安装eventlet

pip install eventlet

2.2 使用

创建main.py文件

import timefrom celery import Celery# 任务消息队列
broker = "redis://127.0.0.1:6379/1"
# 结果消息队列
backend = "redis://127.0.0.1:6379/2"
# 实例化app对象
app = Celery("demo", broker=broker, backend=backend)# 编写任务
@app.task  # 被装饰器装饰了,才是celery任务
def add(a, b):print("a+b的结果是", a + b)time.sleep(1)  # 模拟耗时return a + b

创建add_task.py文件编写消费者代码

"""这个程序用来提交任务 producer"""
from main import add# # 同步任务
# res = add(1, 2)
# print(res)
# 异步任务
# 像消息队列中提交了一个任务,计算1+5的任务,但是没有执行  ceec680b-e0fb-4636-9244-1fa7ca0c570c
res = add.delay(1, 5)  # 没有耗时,直接返回,但是没有返回值,而是返回一个uuid号
print(res)
# 启动worker 再终端使用命令启动,执行完成后会把结果存到redis的2库中
# win :celery -A main worker -l info -P eventlet
# mac/linux:celery -A main worker -l info

启动worker需要再终端下方进行启动

win :celery -A main worker -l info -P eventlet
mac/linux:celery -A main worker -l info

如果报错celery库找不到的问题,使用python -m celery -A main worker -l info -P eventlet进行启动

2.3 包结构

后续的项目越来越大,task任务越来越多,希望把任务拆分再多个py文件中

目录结构

celery.py

import timefrom celery import Celery# 任务消息队列
broker = "redis://127.0.0.1:6379/1"
backend = "redis://127.0.0.1:6379/2"
# 实例化app对象
app = Celery("demo", broker=broker, backend=backend,include=['celery_task.crawl_task', 'celery_task.user_task', 'celery_task.order_task'])# 任务分到不同的py文件中

user_task.py

import time
from .celery import app@app.task
def send_email(to):print("发送邮件")time.sleep(3)print(f"向{to}发送邮件成功")return f"向{to}发送邮件成功"

order_task.py

import time
from .celery import app@app.task
def pay_order():print("开始下单")time.sleep(5)print("下单完成")return "下单完成"

crawl_task.py

import time
from .celery import app@app.task
def crawl_baidu():print("开始爬虫百度")time.sleep(2)print("爬虫完毕百度")return "爬虫完毕百度"@app.task
def crawl_dewu():print("开始爬虫得物")time.sleep(2)print("爬虫完毕百度")return "爬虫完毕得物"

add_task.py

from celery_task.crawl_task import crawl_baidures = crawl_baidu.delay()  # 没有参数,这里就不传
print(res)  # 得到uuid

get_result.py(查询是否被执行)

from .celery_task.celery import app
from celery.result import AsyncResultid = "你的任务uuid"
if __name__ == '__main__':result = AsyncResult(id=id, app=app)if result.successful():result = result.get()print(result)elif result.failed():print("任务失败")elif result.status == "PENDING":print("任务等待被执行")elif result.status == "RETRY":print("任务异常后正在重试")elif result.status == "STARTED":print("任务已经开始被执行")

启动worker命令

celery -A celery_task(包名) worker -l info -P eventlet

异步任务-延时任务-定时任务

异步任务

上述介绍的均为异步任务

使用delay()

延时任务

from celery_task.user_task import send_email
from datetime import datetime, timedeltaeta = datetime.utcnow() + timedelta(seconds=5)  # 默认时区为utc时区
res = send_email.apply_async(args=['邮箱'], eta=eta)
print(res)

apply_async(args=['参数'],eta=延时时间)

如果延迟任务提交了,但是worker没启动,等延迟的时间,worker再启动,任务会立马启动

定时任务

在celery.py中

import timefrom celery import Celery# 任务消息队列
broker = "redis://127.0.0.1:6379/1"
backend = "redis://127.0.0.1:6379/2"
# 实例化app对象
app = Celery("demo", broker=broker, backend=backend,include=['celery_task.crawl_task', 'celery_task.user_task', 'celery_task.order_task'])# 任务分到不同的py文件中# 加入定时任务
# 指定了时区,中国时区,以后延时任务
app.conf.timezone = "Asia/Shanghai"
app.conf.enable_utc = False
# 每隔5s爬取百度
from datetime import datetime, timedeltaapp.conf.beat_schedule = {'low-task': {'task': 'celery_task.crawl_task.crawl_baidu','schedule': timedelta(seconds=5), # 每5秒发送一次'args': ()  # 参数}
}
# 必须启动beat

启动beat命令

celery -A celery_task beat -l info

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/761116.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java:设计模式

文章目录 参考简介工厂模式简单工厂模式工厂方法模式抽象工厂模式总结 单例模式预加载懒加载线程安全问题 策略模式 参考 知乎 简介 总体来说设计模式分为三类共23种。 创建型模式,共五种:工厂方法模式、抽象工厂模式、单例模式、建造者模式、原型模…

基于浏览器localStorage作为数据库完成todolsit项目

一、文章内容 TodoList结构搭建HTML代码 TodoList样式编写Css代码 TodoList行为表现JavaScript代码 二、项目展示 项目介绍 Todolist是一个基于B/S模式开发的待办事项软件,主要功能是离线记录用户的待办事项和已经完成的事情,基于htmlcssjs实现&am…

【C++】---string的模拟

【C】---string的模拟 一、string类实现1.string类的构造函数2.swap()函数3.拷贝构造函数4.赋值运算符重载5.析构6.迭代器7.operator[ ]8.size9.c_str()10.reserve()11.resize()12.p…

flutter 局部view更新,dialog更新进度,dialog更新

局部更新有好几种方法,本次使用的是 StatefulBuilder 定义 customState去更新对话框内容 import package:flutter/cupertino.dart; import package:flutter/material.dart;class ProgressDialog {final BuildContext context;BuildContext? dialogContext;double _…

【DL经典回顾】激活函数大汇总(四十一)(SinReLU附代码和详细公式)

激活函数大汇总(四十一)(SinReLU附代码和详细公式) 更多激活函数见激活函数大汇总列表 一、引言 欢迎来到我们深入探索神经网络核心组成部分——激活函数的系列博客。在人工智能的世界里,激活函数扮演着不可或缺的角色,它们决定着神经元的输出,并且影响着网络的学习能…

8节点空间壳单元Matlab有限元编程 | 曲壳单元 | 模态分析 | 3D壳单元 | 板壳理论| 【源代码+理论文本】

专栏导读 作者简介:工学博士,高级工程师,专注于工业软件算法研究本文已收录于专栏:《有限元编程从入门到精通》本专栏旨在提供 1.以案例的形式讲解各类有限元问题的程序实现,并提供所有案例完整源码;2.单元…

Mysql的行级锁

MySQL 中锁定粒度最小的一种锁,是 针对索引字段加的锁 ,只针对当前操作的行记录进行加锁。 行级锁能大大减少数据库操作的冲突。其加锁粒度最小,并发度高,但加锁的开销也最大,加锁慢,会出现死锁。行级锁和存…

数据结构面试常见问题之Insert or Merge

😀前言 本文将讨论如何区分插入排序和归并排序两种排序算法。我们将通过判断序列的有序性来确定使用哪种算法进行排序。具体而言,我们将介绍判断插入排序和归并排序的方法,并讨论最小和最大的能区分两种算法的序列长度。 🏠个人主…

Postman接口做关联测试的方法步骤

应用场景 假设下一个接口登录需要上一个接口的返回值,例如请求需要先登录获取到token,下一个请求要携带对应的token才能进行请求 方法:通过设置全局变量/环境变量 方法一:设置全局变量 1.先请求登录接口,请求成功之后…

力扣Lc20--- 202.快乐数(java版)-2024年3月20日

1.题目 2.知识点 (1)while (seen.contains(n) false) { // 循环体 } 与 !seen.contains(n) 等同 (2) 当传入数字 19 给 isHappy(19) 方法时,下面是每一行代码的执行过程: 初始化一个空的 HashSet&#…

32.网络游戏逆向分析与漏洞攻防-游戏网络通信数据解析-网络数据分析原理与依据

免责声明:内容仅供学习参考,请合法利用知识,禁止进行违法犯罪活动! 如果看不懂、不知道现在做的什么,那就跟着做完看效果 内容参考于:易道云信息技术研究院VIP课 上一个内容:31.其它消息的实…

el-table的border属性失效问题解决方案

目录 问题: 使用的代码: 官方文档的说明: 可能的问题所在: 关于使用了作用域插槽: a.自定义内容的样式覆盖: b.表格结构的改变: 解决方案: 通过css样式解决: 下面…

打流仪/网络测试仪这个市场还能怎么卷?

#喝了点,码点字# 以下为个人观点,看看就好,如有冒犯,私信删稿 都有哪些厂商在做打流仪/网络测试仪 -洋品牌:思博伦/Viavi-Spirent,是德/Keysight-Ixia,信雅纳/Lecroy-Xena, -国产…

Pytest配置文件pytest.ini的具体使用

前言 说到配置,大家可能想到的是不经常更改的内容,比如Django里的settings.py文件,或者我们做自动化的时候,把测试环境的域名和正式环境的域名放到一个配置文件里,所有的接口都从这个文件里读取。这样,如果…

python中获取当前项目的目录

大家好,我是雄雄,欢迎关注微信公众号:雄雄的小课堂 今天介绍一下,如何在python中获取当前项目所在的目录,而不是运行脚本的目录。 class ProjectPaths:# 初始化时获取当前脚本的路径staticmethoddef get_script_dir():…

MySQL进阶-----存储引擎

目录 前言 一、MySQL体系结构 二、存储引擎介绍 三、存储引擎特点 1.InnoDB 2.MyISAM 3.Memory 4.区别及特点 四、存储引擎选择 前言 从本期开始,我们就正式进入到MySQL进阶篇的学习了,前面的基础篇就告一段落了。进阶篇的第一期我们就从MySQL的…

opengl 学习(六)-----坐标系统与摄像机

坐标系统与摄像机 分类引言坐标系统摄像机教程在CMake中使用全局定义预编译宏,来控制是否开启错误检查补充 分类 opengl c 引言 OpenGL希望在每次顶点着色器运行后,我们可见的所有顶点都为标准化设备坐标(Normalized Device Coordinate, NDC)。也就是说&#xff…

Python使用指定端口进行http请求的例子

使用requests库 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class SourcePortAdapter(HTTPAdapter): """"Transport adapter" that allows us to set the source port.""" def __init__(self, port, *args, **kwargs): self.poolm…

JetPack之LiveData粘性原因分析及hook解决

目录 前言一、LiveData粘性原因分析1.1 发送消息流程1.2 监听消息流程1.3 根因分析 二、hook解决 前言 在 Android 中,LiveData 的默认行为是粘性的,即 LiveData 在设置数据后,即使观察者订阅时已经有数据存在,观察者仍会立即收到…

【链表】Leetcode 19. 删除链表的倒数第 N 个结点【中等】

删除链表的倒数第 N 个结点 给你一个链表,删除链表的倒数第 n 个结点,并且返回链表的头结点。 示例 1: 输入:head [1,2,3,4,5], n 2 输出:[1,2,3,5] 解题思路 1、使用快慢指针找到要删除节点的前一个节点。2、删…