分布式异步任务框架celery

Celery介绍

github地址:GitHub - celery/celery: Distributed Task Queue (development branch)

文档地址:Celery - Distributed Task Queue — Celery 5.3.6 documentation

1.1 Celery是什么

celery时一个灵活且可靠的处理大量消息的分布式系统,可以在多个节点之间处理某个任务

celery时一个专注于实时处理的任务队列,支持任务调度

celery是开源的,有很多的使用者

celery完全基于python语言编写的

celery本质上是一个【分布式的异步任务调度框架】,类似于Apache的airflow

celery只是用来调度任务的,但是它本身并不具备存储任务的功能,而调度任务的时候肯定是要把任务存起来,因此要使用celery的话,还需要搭配一些具有存储、访问功能的工具,比如:消息队列、Redis缓存、数据库等。官方推荐是消息队列RabbitMQ,我们使用Redis

同步调用函数 --》add--》执行5s钟--》数据返回了

异步调用函数--》add--》执行5s钟--》执行完的数据,找个地方存起来

调用方--》去存的地方看一下--》任务有没有执行完

1.2 应用场景

1)异步任务

 视频转码、邮件发送、消息推送等一些耗时操作

2)定时任务

定时推送消息、定时爬取一些数据、定时统计一些数据

3)延时任务

提交任务后,等一段时间再执行任务

1.3 celery架构

celery架构,它采用典型的生产者-消费者模式,主要由以下部分组成

生产者生产---消费者进行消费

producer:它负责把任务提交到broker钟

celery Beat:会读取文件、周期性的向broker中提交任务

broker:消息中间件,放任务的地点,celery本身不提供,借助redis等消息队列

worker:工人、消费者,负责从消息中间件中取出任务--》执行

backend:worker执行完,会有结果,结果存储再backend,celery不提供,借助redis等消息队列。

Celery使用

2.1 安装

pip install celery

使用redis作为消息队列

pip install redis

如果是Windows系统还需要安装eventlet

pip install eventlet

2.2 使用

创建main.py文件

import timefrom celery import Celery# 任务消息队列
broker = "redis://127.0.0.1:6379/1"
# 结果消息队列
backend = "redis://127.0.0.1:6379/2"
# 实例化app对象
app = Celery("demo", broker=broker, backend=backend)# 编写任务
@app.task  # 被装饰器装饰了,才是celery任务
def add(a, b):print("a+b的结果是", a + b)time.sleep(1)  # 模拟耗时return a + b

创建add_task.py文件编写消费者代码

"""这个程序用来提交任务 producer"""
from main import add# # 同步任务
# res = add(1, 2)
# print(res)
# 异步任务
# 像消息队列中提交了一个任务,计算1+5的任务,但是没有执行  ceec680b-e0fb-4636-9244-1fa7ca0c570c
res = add.delay(1, 5)  # 没有耗时,直接返回,但是没有返回值,而是返回一个uuid号
print(res)
# 启动worker 再终端使用命令启动,执行完成后会把结果存到redis的2库中
# win :celery -A main worker -l info -P eventlet
# mac/linux:celery -A main worker -l info

启动worker需要再终端下方进行启动

win :celery -A main worker -l info -P eventlet
mac/linux:celery -A main worker -l info

如果报错celery库找不到的问题,使用python -m celery -A main worker -l info -P eventlet进行启动

2.3 包结构

后续的项目越来越大,task任务越来越多,希望把任务拆分再多个py文件中

目录结构

celery.py

import timefrom celery import Celery# 任务消息队列
broker = "redis://127.0.0.1:6379/1"
backend = "redis://127.0.0.1:6379/2"
# 实例化app对象
app = Celery("demo", broker=broker, backend=backend,include=['celery_task.crawl_task', 'celery_task.user_task', 'celery_task.order_task'])# 任务分到不同的py文件中

user_task.py

import time
from .celery import app@app.task
def send_email(to):print("发送邮件")time.sleep(3)print(f"向{to}发送邮件成功")return f"向{to}发送邮件成功"

order_task.py

import time
from .celery import app@app.task
def pay_order():print("开始下单")time.sleep(5)print("下单完成")return "下单完成"

crawl_task.py

import time
from .celery import app@app.task
def crawl_baidu():print("开始爬虫百度")time.sleep(2)print("爬虫完毕百度")return "爬虫完毕百度"@app.task
def crawl_dewu():print("开始爬虫得物")time.sleep(2)print("爬虫完毕百度")return "爬虫完毕得物"

add_task.py

from celery_task.crawl_task import crawl_baidures = crawl_baidu.delay()  # 没有参数,这里就不传
print(res)  # 得到uuid

get_result.py(查询是否被执行)

from .celery_task.celery import app
from celery.result import AsyncResultid = "你的任务uuid"
if __name__ == '__main__':result = AsyncResult(id=id, app=app)if result.successful():result = result.get()print(result)elif result.failed():print("任务失败")elif result.status == "PENDING":print("任务等待被执行")elif result.status == "RETRY":print("任务异常后正在重试")elif result.status == "STARTED":print("任务已经开始被执行")

启动worker命令

celery -A celery_task(包名) worker -l info -P eventlet

异步任务-延时任务-定时任务

异步任务

上述介绍的均为异步任务

使用delay()

延时任务

from celery_task.user_task import send_email
from datetime import datetime, timedeltaeta = datetime.utcnow() + timedelta(seconds=5)  # 默认时区为utc时区
res = send_email.apply_async(args=['邮箱'], eta=eta)
print(res)

apply_async(args=['参数'],eta=延时时间)

如果延迟任务提交了,但是worker没启动,等延迟的时间,worker再启动,任务会立马启动

定时任务

在celery.py中

import timefrom celery import Celery# 任务消息队列
broker = "redis://127.0.0.1:6379/1"
backend = "redis://127.0.0.1:6379/2"
# 实例化app对象
app = Celery("demo", broker=broker, backend=backend,include=['celery_task.crawl_task', 'celery_task.user_task', 'celery_task.order_task'])# 任务分到不同的py文件中# 加入定时任务
# 指定了时区,中国时区,以后延时任务
app.conf.timezone = "Asia/Shanghai"
app.conf.enable_utc = False
# 每隔5s爬取百度
from datetime import datetime, timedeltaapp.conf.beat_schedule = {'low-task': {'task': 'celery_task.crawl_task.crawl_baidu','schedule': timedelta(seconds=5), # 每5秒发送一次'args': ()  # 参数}
}
# 必须启动beat

启动beat命令

celery -A celery_task beat -l info

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/761116.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

rust学习(简单handler实现)

用过android的同学对于handler应该都很了解,用起来比较方便。这里用rust设计了一个简单的rust。 1.处理接口 pub trait ProcessMessage {fn handleMessage(&self,msg:Message); } 2.Message结构体 pub struct Message {pub what:u32,pub arg1:i32,pub arg2:…

Java:设计模式

文章目录 参考简介工厂模式简单工厂模式工厂方法模式抽象工厂模式总结 单例模式预加载懒加载线程安全问题 策略模式 参考 知乎 简介 总体来说设计模式分为三类共23种。 创建型模式,共五种:工厂方法模式、抽象工厂模式、单例模式、建造者模式、原型模…

Function与Module的差异与应用场景,symbolic() 和 forward() 是什么关系?

Function与Module的差异与应用场景 Function与Module都可以对pytorch进行自定义拓展,使其满足网络的需求,但这两者还是有十分重要的不同: Function一般只定义一个操作,因为其无法保存参数,因此适用于激活函数、pooli…

Unity构建详解(1)——SBP介绍

【前言】 Unity的资源工作流程分为导入、创建、构建、分发、加载。我们说的是其中的构建步骤。 构建是指将项目工程中的资源文件和代码整合程可执行文件的过程,构建的结果是生成可执行文件,在win平台上是exe,在Android平台上是apk&#xff…

基于浏览器localStorage作为数据库完成todolsit项目

一、文章内容 TodoList结构搭建HTML代码 TodoList样式编写Css代码 TodoList行为表现JavaScript代码 二、项目展示 项目介绍 Todolist是一个基于B/S模式开发的待办事项软件,主要功能是离线记录用户的待办事项和已经完成的事情,基于htmlcssjs实现&am…

Superset二次开发之PostgreSQL 存储库介绍

Apache Superset 使用 PostgreSQL 作为其默认的元数据数据库,来存储关于数据源、图表、仪表盘、用户及其权限等信息。下面是列出的一些主要表的功能和作用的简介: 权限和角色 ab_permission: 存储权限,如“可以访问仪表板”、“可以执行SQL查询”等。ab_permission_view: 将…

Mysql——索引下推

MySQL的索引下推(Index Condition Pushdown, ICP)是一种查询优化技术,它允许MySQL在存储引擎层执行部分WHERE子句中的过滤条件,而非全部在MySQL服务器层执行。这使得在扫描索引过程中就可以剔除不满足条件的记录,从而减…

【C++】---string的模拟

【C】---string的模拟 一、string类实现1.string类的构造函数2.swap()函数3.拷贝构造函数4.赋值运算符重载5.析构6.迭代器7.operator[ ]8.size9.c_str()10.reserve()11.resize()12.p…

flutter 局部view更新,dialog更新进度,dialog更新

局部更新有好几种方法,本次使用的是 StatefulBuilder 定义 customState去更新对话框内容 import package:flutter/cupertino.dart; import package:flutter/material.dart;class ProgressDialog {final BuildContext context;BuildContext? dialogContext;double _…

【DL经典回顾】激活函数大汇总(四十一)(SinReLU附代码和详细公式)

激活函数大汇总(四十一)(SinReLU附代码和详细公式) 更多激活函数见激活函数大汇总列表 一、引言 欢迎来到我们深入探索神经网络核心组成部分——激活函数的系列博客。在人工智能的世界里,激活函数扮演着不可或缺的角色,它们决定着神经元的输出,并且影响着网络的学习能…

Oracle函数6—递归查询(start with...connect by、sys_connect_by_path、level)

文章目录 一、准备数据二、基本使用三、level函数四、获取完整的全树路径 一、准备数据 创建表 CREATE TABLE TEST_ORG (ID VARCHAR2(64) NOT NULL PRIMARY KEY,NAME VARCHAR2(200),PARTEN_ID VARCHAR2(64) ); comment on column TEST_ORG.ID is 主键; comment on column TES…

C语言经典例题(2) --- 阶乘、斐波那契数、9*9乘法表、字符串逆序、求和

文章目录 1.求n的阶乘。(不考虑溢出)2.求第n个斐波那契数。&#xff08;不考虑溢出&#xff09;3.屏幕上输出9*9乘法口诀表4.字符串逆序(递归实现)5.计算一个数的每位之和(递归实现) 1.求n的阶乘。(不考虑溢出) #include <stdio.h>int fac(int n);int main() {int n 0;…

8节点空间壳单元Matlab有限元编程 | 曲壳单元 | 模态分析 | 3D壳单元 | 板壳理论| 【源代码+理论文本】

专栏导读 作者简介&#xff1a;工学博士&#xff0c;高级工程师&#xff0c;专注于工业软件算法研究本文已收录于专栏&#xff1a;《有限元编程从入门到精通》本专栏旨在提供 1.以案例的形式讲解各类有限元问题的程序实现&#xff0c;并提供所有案例完整源码&#xff1b;2.单元…

Mysql的行级锁

MySQL 中锁定粒度最小的一种锁&#xff0c;是 针对索引字段加的锁 &#xff0c;只针对当前操作的行记录进行加锁。 行级锁能大大减少数据库操作的冲突。其加锁粒度最小&#xff0c;并发度高&#xff0c;但加锁的开销也最大&#xff0c;加锁慢&#xff0c;会出现死锁。行级锁和存…

数据结构面试常见问题之Insert or Merge

&#x1f600;前言 本文将讨论如何区分插入排序和归并排序两种排序算法。我们将通过判断序列的有序性来确定使用哪种算法进行排序。具体而言&#xff0c;我们将介绍判断插入排序和归并排序的方法&#xff0c;并讨论最小和最大的能区分两种算法的序列长度。 &#x1f3e0;个人主…

Postman接口做关联测试的方法步骤

应用场景 假设下一个接口登录需要上一个接口的返回值&#xff0c;例如请求需要先登录获取到token&#xff0c;下一个请求要携带对应的token才能进行请求 方法&#xff1a;通过设置全局变量/环境变量 方法一&#xff1a;设置全局变量 1.先请求登录接口&#xff0c;请求成功之后…

力扣Lc20--- 202.快乐数(java版)-2024年3月20日

1.题目 2.知识点 &#xff08;1&#xff09;while (seen.contains(n) false) { // 循环体 } 与 !seen.contains(n) 等同 &#xff08;2&#xff09; 当传入数字 19 给 isHappy(19) 方法时&#xff0c;下面是每一行代码的执行过程&#xff1a; 初始化一个空的 HashSet&#…

32.网络游戏逆向分析与漏洞攻防-游戏网络通信数据解析-网络数据分析原理与依据

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 如果看不懂、不知道现在做的什么&#xff0c;那就跟着做完看效果 内容参考于&#xff1a;易道云信息技术研究院VIP课 上一个内容&#xff1a;31.其它消息的实…

(七)事件组

一、概念 &#xff08;1&#xff09;用于实现任务与任务、任务与中断之间通信和同步&#xff0c;无数据传输 &#xff08;2&#xff09;不同于信号量的是&#xff0c;信号量是一对一的&#xff0c;而事件可以是一对多和多对一的&#xff0c;即一个任务等待多个事件或多个任务等…

el-table的border属性失效问题解决方案

目录 问题&#xff1a; 使用的代码&#xff1a; 官方文档的说明&#xff1a; 可能的问题所在&#xff1a; 关于使用了作用域插槽&#xff1a; a.自定义内容的样式覆盖&#xff1a; b.表格结构的改变&#xff1a; 解决方案&#xff1a; 通过css样式解决&#xff1a; 下面…