Celery入门教程

一.Celery介绍

1.Celery架构

Celery架构基于可插拔组件(pluggable components)和根据选择的消息传输(代理)(message transport(broker))协议实现的消息交换机制。

2.Celery模块

(1)任务模块 Task

包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列。

(2)消息中间件 Broker

Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。

(3)任务执行单元 Worker

Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。

(4)任务结果存储 Backend

Backend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用 RabbitMQ、Redis 和 MongoDB 等。

二.Celery例子 [5]

1.代码结构

Celery例子参考文献[5],项目结构如下所示:

celery_demo                    # 项目根目录├── celery_app             # 存放 celery 相关文件│   ├── __init__.py│   ├── celeryconfig.py    # 配置文件│   ├── task1.py           # 任务文件 1│   └── task2.py           # 任务文件 2└── client.py              # 应用程

(1)__init__.py

# -*- coding: utf-8 -*-
from celery import Celeryapp = Celery('demo')  # 创建 Celery 实例
app.config_from_object('celery_app.celeryconfig')  # 通过 Celery 实例加载配置模块

(2)celeryconfig.py

# -*- coding: utf-8 -*-
from datetime import timedelta
from celery.schedules import crontab# Broker and Backend
BROKER_URL = 'redis://127.0.0.1:7379'  # 使用redis默认数据库0
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:7379/0'  # 明确指定redis数据库0# Timezone
CELERY_TIMEZONE = 'Asia/Shanghai'  # 指定时区,不指定默认为 'UTC'
# CELERY_TIMEZONE='UTC'# import
CELERY_IMPORTS = ('celery_app.task1','celery_app.task2'
)# schedules
CELERYBEAT_SCHEDULE = {'add-every-30-seconds': {'task': 'celery_app.task1.add','schedule': timedelta(seconds=30),  # 每 30 秒执行一次'args': (5, 8)  # 任务函数参数},'multiply-at-some-time': {'task': 'celery_app.task2.multiply','schedule': crontab(hour='9', minute='50'),  # 每天早上 9 点 50 分执行一次'args': (3, 7)  # 任务函数参数}
}

(3)task1.py

import time
from celery_app import app@app.task
def add(x, y):time.sleep(2)return x + y

(4)task2.py

import time
from celery_app import app@app.task
def multiply(x, y):time.sleep(2)return x * y

(5)client.py

"""
Celery入门教程:https://blog.csdn.net/youzhouliu/article/details/124239709
"""
# -*- coding: utf-8 -*-
from celery_app import task1
from celery_app import task2result1 = task1.add.apply_async(args=[2, 8])  # 可用task1.add.delay(2, 8)
print(result1.get())
# task2.multiply.apply_async(args=[3, 7])  # 可用task2.multiply.delay(3, 7)
# print(result2.get())print('hello world')

2.执行异步任务报错

在执行异步任务时报错:ValueError: not enough values to unpack (expected 3, got 0)

网上查了下这个错误,有3种解决方法。

(1)方式1

celery -A your_app_name worker --pool=solo -l info

(2)方式2

设置环境变量set FORKED_BY_MULTIPROCESSING = 1

(3)方式3

pip install eventlet
celery -A your_app_name worker --pool=eventlet

3.并发引擎

在 Celery 中,--pool 参数用于指定执行任务的并发引擎。Celery 支持多种并发引擎,包括 multiprocessing(默认)、eventlet、gevent 和 solo。这些并发引擎的主要区别在于它们处理并发任务的方式:

(1)multiprocessing

这是 Celery 默认并发引擎。它使用 Python 的 multiprocessing 模块创建子进程来并发执行任务。每个任务在自己的子进程中运行,子进程之间的内存是隔离的。这种方式可以充分利用多核 CPU,但进程间通信的开销较大。

(2)eventlet

这是一种基于协程的并发引擎。它使用 eventlet 库创建轻量级的线程(即协程)来并发执行任务。所有的协程在同一个进程中运行,协程之间可以共享内存,但只能在 I/O 操作时进行切换。这种方式适合 I/O 密集型任务,但不能充分利用多核 CPU。

(3)gevent

这也是一种基于协程的并发引擎。它使用 gevent 库创建协程来并发执行任务。gevent 的工作方式和 eventlet 类似,但它使用的是 libev 事件循环,而 eventlet 使用的是 libevent 事件循环。

(4)solo

这是一种特殊的并发引擎,它在主进程中直接执行任务,不创建任何子进程或线程。这种模式的优点是简单,没有进程间通信的开销,但缺点是无法利用多核 CPU,因为所有任务都在一个进程中顺序执行。

在选择并发引擎时,需根据任务特性和系统环境来决定。如果任务是 CPU 密集型的,那么 multiprocessing 可能是最好的选择。如果任务是 I/O 密集型的,那么 eventlet 或 gevent 可能更适合。

4.异步任务和定时任务

(1)异步任务命令

celery -A celery_app worker --loglevel=info

(2)定时任务命令

celery -A celery_app beat

参考文献

[1] Celery - Distributed Task Queue:https://docs.celeryq.dev/en/stable/

[2] Celery GitHub:https://github.com/celery/celery

[3] Celery API Reference:https://docs.celeryq.dev/en/stable/reference/index.html

[4] ValueError: not enough values to unpack (expected 3, got 0):https://github.com/celery/celery/issues/4178

[5] Celery入门教程:https://blog.csdn.net/youzhouliu/article/details/124239709

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/40060.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024中国西安科博会暨硬科技产业博览会11月召开

2024第18届中国西安国际科学技术产业博览会暨硬科技产业博览会 时间:2024年11月3日-5日 地点:西安国际会展中心 主办单位:中国国际科学技术合作协会 陕西省科技资源统筹中心 协办单位:西安市科学技术协会 西安市中小企业协会、…

昇思25天学习打卡营第3天|yulang

今天主要学习03-张量Tensor,主要包含了处理创建张量、张量的属性、张量索引和张量运算,稀疏张量,有点看不太懂,感觉要开始入门到放弃了?张量在构建和训练深度学习模型中的实际应用,如卷积神经网络。 张量&a…

Django学习第三天

python manage.py runserver 使用以上的命令启动项目 实现新建用户数据功能 views.py文件代码 from django.shortcuts import render, redirect from app01 import models# Create your views here. def depart_list(request):""" 部门列表 ""&qu…

一键获取:Win11笔记本系统下载地址!

在笔记本电脑操作中,用户想安装一款适合笔记本电脑使用的Win11系统,但不知道在哪里可以下载到?接下来系统之家小编给大家分享Win11笔记本系统下载地址,有需要的小伙伴一键点击即可获取,快速安装系统,即可体…

<电力行业> - 《第15课:电力领域(一)》

1 电网 发电厂与最终用电用户(负荷)往往相距很远,因此电力需要由电厂”输送“到最终用户,即“输电环节“,电流的输送往往导致因线路发热造成损耗,所以在输送的时候都是通过变电升高电压,让电流…

计算机网络 | 期末复习

物理层: 奈氏准则:带宽(w Hz),在不考虑噪音的情况下,最大速率(2W)码元/秒 信噪比S/N:以分贝(dB)为度量单位。信噪比(dB)…

C++初学者指南-3.自定义类型(第一部分)-异常

C初学者指南-3.自定义类型(第一部分)-异常 文章目录 C初学者指南-3.自定义类型(第一部分)-异常简介什么是异常?第一个示例用途:报告违反规则的行为异常的替代方案标准库异常处理 问题和保证资源泄露使用 RAII 避免内存泄漏!析构函数:不要让异…

SpringBoot源码阅读3-启动原理

SpringBootApplication public class DistApplication {public static void main(String[] args) {// 启动入口SpringApplication.run()SpringApplication.run(DistApplication.class, args);} }1、服务构建 这里"服务"指的是SpringApplication对象,服务…

2024年港澳台联考考生成绩数据分析来啦

分数线 出炉 2024年的港澳台联考正式出分!根据考生成绩,全国联招划档线如下: 一、本科批次 (一)普通类院校(专业):文史类365分、理工类390分(部分院校执行高分线&#…

持续直击WCCI 2024:金耀初教授、台湾省台北分会等获殊荣 横滨夜景美不胜收

持续直击WCCI 2024:金耀初教授、台湾省台北分会等获殊荣!横滨夜景美不胜收! 会议之眼 快讯 会议介绍 IEEE WCCI(World Congress on Computational Intelligence)2024,即2024年IEEE世界计算智能大会&…

BAS(入侵与攻击模拟)正在替代红队测试?

之前经常会被用户问到,漏扫、渗透和红队红的区别是啥? 传统的漏扫、渗透和红蓝对抗,可以看到工具化的漏洞不可靠,人工的成本就高。怎么找到一个漏洞可信度又高,成本又低的,就诞生了BAS。 抛开漏扫&#xf…

umi项目中的一些趣事

前言 出于某些安全问题,需要把HTML中框架注入的umi版本信息去掉,那要怎么搞呢~ 方案 查找官方文档,没发现可以去掉注入信息的方法,但在一番折腾后😉终究还是解决了~ 发现 版本信息是从这里注入的~ Object.define…

解决pip安装时的“SyntaxError: invalid syntax”错误

项目场景: 项目中有新的成员加入时,第一步就是安装开发环境,然而往往同样的机器、同样的配置,我们却总能遇到各种各样不同的问题。 今天分享一个简单的操作问题。 问题描述 项目用到pandas,安装pandas时遇到Syntax…

Java后端每日面试题(day2)

目录 Session和Cookie的关系Cookie与Session的区别JWT 由哪些部分组成?如何防止 JWT 被篡改?JWT 的特点 Session和Cookie的关系 Session和Cookie都可以用来实现跟踪用户状态,而二者是关系的:Session的实现依赖于Cookie。 Session…

【C语言】顺序表经典算法

本文介绍的是两道顺序表经典算法题目。 移除元素 (来源:LeetCode) 题目 分析 我们很容易想到的办法是去申请一个新的数组,遍历原数组不等于val就把它拿到新数组里。但是题目的要求是不使用额外空间,所以这种方法我们…

Python面向对象编程中的继承及其应用

目录 1. 继承的基本概念 2. 继承的语法 3. 继承的应用场景 4. 使用示例:汽车销售系统 5. 总结 继承是面向对象编程中的一个重要概念,它允许我们根据已有类创建新类,并继承已有类的属性和方法。在本文中,我们将学习Python中的…

Unity3D中,AI角色Rigidbody旋转导致的动画问题

在制作一些AI角色的时候,可能会运用到Rigidbody组件来使AI角色拥有一些相关的物理属性,但是AI角色在受到一些物理碰撞或者惯性等原因,会发生旋转导致动画出现意料外的错误,比如在由动转静的时候,可能会发生向前翻转等一…

卷积层里的填充和步幅

一、定义 1、对于卷积,我们另一个超参数是核的大小,通常使用的卷积核是33或者55,很少用偶数核 2、填充是为了让输出不变或者变大,是为了在输入不太大,又能使模型足够深的情况下使用 3、填充:在输入周围添…

【Java学习笔记】java图形界面编程

在前面的章节中,我们开发运行的应用程序都没有图形界面,但是很多应用软件,如Windows下的Office办公软件、扑克牌接龙游戏软件、企业进销存ERP系统等,都有很漂亮的图形界面。素以需要我们开发具有图形界面的软件。 Java图形界面编程…

Megatron-DeepSpeed与Megatron-LM在reduce grad上的差异

Megatron-DeepSpeed与Megatron-LM在reduce grad上的差异 一.Megatron-DeepSpeed 实现【deepspeed/runtime/engine.py】二.ModelLink 实现【ParamAndGradBuffer】1.ParamAndGradBuffer功能介绍2.实现原理A.分配一大块内存B.获取视图C.all_reduce grad 测试DP1,TP2,PP1,MBS1,zero…