Celery的任务流

在这里插入图片描述

Celery的任务流

在之前调用任务的时候只是使用delay()和apply_async()方法。但是有时我们并不想简单的执行单个异步任务,比如说需要将某个异步任务的结果作为另一个异步任务的参数或者需要将多个异步任务并行执行,返回一组返回值,为了实现此目标,Celery使用一种叫做signatures的东西

celery的简单使用

signature的引入

signature官方文档

可以简单理解为signature是将之前的异步任务以某种方式包装,包装后的异步任务仍可以使用之前的delay()和apply_async()方法,并且包装后的异步任务就可以以多种方式组合成复杂的工作流程

先创建一个tasks.py

import timeimport celerybroker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('app', backend=backend, broker=broker)# 加
@app.task
def add_num(a, b):print(f'{a}+{b}')time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序c = a + breturn c# 减
@app.task
def subtract_num(a, b):print(f'{a}-{b}')time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序c = a - breturn c# 乘
@app.task
def multiply_num(a, b):print(f'{a}*{b}')time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序c = a * breturn c# 除
@app.task
def divide_num(a, b):print(f'{a}/{b}')time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序c = a / breturn c@app.task
def test(args):print(args)return args

运行celery消费者

# 格式为:celery -A app对象所在的文件 worker -l 日志级别 -Q 队列名称(也可以不指定,默认为celry)
celery -A tasks worker -l info -Q test

在这里插入图片描述

用signature对上面的add_num包装

from celery import signature
from tasks import add_num, subtract_num, multiply_num, divide_num# 方法1
sign = add_num.signature((1, 1), queue='test')
ret = sign.delay()
print(ret.get())# 方法2
sign = signature('tasks.add_num', (1, 1), queue='test')
ret = sign.delay()
print(ret.get())# 方法3
sign = signature(add_num, (1, 1), queue='test')
ret = sign.delay()
print(ret.get())

chain的使用

chain官方链接

chain可以将signature包装的任务函数一个一个执行,一个执行完将执行return结果传递给下一个任务函数

from celery import signature, chain
from tasks import add_num, subtract_num, multiply_num, divide_numadd_sign = signature(add_num, (6, 2), queue='test')
subtract_sign = signature(subtract_num, (2,), queue='test')
multiply_sign = signature(multiply_num, (2,), queue='test')
divide_sign = signature(divide_num, (2,), queue='test')# 对某个数依次做加减乘除处理
chain1 = chain(add_sign, subtract_sign, multiply_sign, divide_sign)
ret = chain1.delay()
print(ret.get())

在这里插入图片描述

可以看到异步任务依次执行,并将上一个异步任务的结果作为参数传递给下一个,形成一个链条

group的使用

group官方链接

group可以将signature包装的任务函数并行执行,返回一组返回值

from celery import signature, chain, group
from tasks import add_num, subtract_num, multiply_num, divide_numadd_sign = signature(add_num, (6, 2), queue='test')
subtract_sign = signature(subtract_num, (6, 2), queue='test')
multiply_sign = signature(multiply_num, (6, 2), queue='test')
divide_sign = signature(divide_num, (6, 2), queue='test')# 对某个数分别做加减乘除处理group1 = group(add_sign, subtract_sign, multiply_sign, divide_sign)
ret = group1.delay()
print(ret.get())
#[8, 4, 12, 3.0]

在这里插入图片描述
可以看到相比于chain,group里的任务是同时执行

chord的使用

chord官方链接
依赖一个group任务,group任务结束后,将所有子任务的返回值作为参数传递给chord的回调函数,即chord由group任务组与回调函数组成

上代码

from celery import signature, chain, group, chord
from tasks import add_num, subtract_num, multiply_num, divide_num, testadd_sign = signature(add_num, (6, 2), queue='test')
subtract_sign = signature(subtract_num, (6, 2), queue='test')
multiply_sign = signature(multiply_num, (6, 2), queue='test')
divide_sign = signature(divide_num, (6, 2), queue='test')group1 = group(add_sign, subtract_sign, multiply_sign, divide_sign)#包装test异步任务函数
test_sign = signature(test, queue='test')c1 = chord(group1, test_sign)
c1.delay()

在这里插入图片描述
可以看出,在执行完加减乘除所有异步任务后,chord会将任务组的结果作为list交给test函数,这里的test有点像回调函数

PS:根据我的观察,chain,group,chord在执行完后都会返回一个任务id,其中chain的任务id为任务链里最后一个任务的id,group的任务id是一个临时的任务id(group任务都结束后就会消失),chord的任务id是回调函数的任务id。因此chain和chord在任务结束后,任务结果还是可以查到的,而group则查询不到,因此group的任务结果可能无法用AsyncResult查询到

最后附上celery关于任务工作流的官方链接
celery工作流

PS

有的时候我们可能需要在celery的task函数中调用其他的celery函数,并且需要同步的获取结果(其实着本质上就是把异步的celery函数变成同步运行),具体如下,先创建一个tasks.py

import timeimport celerybroker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('app', backend=backend, broker=broker)# 加
@app.task
def add_num(a, b):print(f'{a}+{b}')time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序c = a + breturn c@app.task
def test():#在test函数里调用add_num函数,并且同步获取结果,将结果作为test函数的返回值ret = add_num.delay(1,2)ret = ret.get()return ret
#启动消费者
celery -A tasks worker -l info

调用test异步函数

from tasks import test
ret = test.delay()

在这里插入图片描述

结果就是出错了,因为官方不建议在一个异步任务中区等待另一个异步任务的返回结果,所以这个时候就可以通过上面的chain方法实现这个需求。当然还有一种不建议的方法就是在同步获取celery任务结果的get方法中添加参数disable_sync_subtasks=False,具体如下

import timeimport celerybroker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('app', backend=backend, broker=broker)# 加
@app.task
def add_num(a, b):print(f'{a}+{b}')time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序c = a + breturn c@app.task
def test():ret = add_num.delay(1, 2)ret = ret.get(disable_sync_subtasks=False)#在这添加disable_sync_subtasks=Falsereturn ret

再调用一次test方法

在这里插入图片描述
成功调用

详见celery官方链接
链接传送门

结语

写这些,仅记录自己学习使用celery的过程。如果有什么错误的地方,还请大家批评指正。最后,希望小伙伴们都能有所收获。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/790625.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

STL是什么?如何理解STL?

文章目录 1. 什么是STL2. STL的版本3. STL的六大组件4. 如何学习STL5.STL的缺陷 1. 什么是STL STL(standard template libaray-标准模板库):是C标准库的重要组成部分,不仅是一个可复用的组件库,而且是一个包罗数据结构与算法的软件框架。 2. …

OpenHarmony实战开发-使用一次开发多端部署实现一多设置典型页面

介绍 本示例展示了设置应用的典型页面,其在小窗口和大窗口有不同的显示效果,体现一次开发、多端部署的能力。 1.本示例使用一次开发多端部署中介绍的自适应布局能力和响应式布局能力进行多设备(或多窗口尺寸)适配,保…

WebGIS 之 vue3+vite+ceisum

1.项目搭建node版本在16以上 1.1创建项目 npm create vite 项目名 1.2选择框架 vuejavaScript 1.3进入项目安装依赖 cd 项目名 npm install 1.4安装cesium依赖 pnpm i cesium vite-plugin-cesium 1.5修改vite.config.js文件 import { defineConfig } from vite import vue fr…

RK3568 RTC驱动实验

RK3568 RTC驱动实验 1. RTC简介 ​ RTC 也就是实时时钟,用于记录当前系统时间,对于 Linux 系统而言时间是非常重要的,使用 Linux 设备的时候也需要查看时间。RTC是Linux的时间系统。 ​ RTC 设备驱动是一个标准的字符设备驱动,…

基于Python的微博旅游情感分析、微博舆论可视化系统

博主介绍:✌程序员徐师兄、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取源码联系🍅 👇🏻 精彩专栏推荐订阅👇…

Python网络爬虫(三):Selenium--以携程酒店为例

1 Selenium简介 Selenium是一个用于网站应用程序自动化的工具,它可以直接运行在浏览器中,就像真正的用户在操作一样。它相当于一个机器人,可以模拟人类在浏览器上的一些行为,比如输入文本、点击、回车等。Selenium支持多种浏览器&…

记录一次官网访问很慢的情况

客户查看云监控,带宽未超限,客户取的是1分钟的原生值,也就是1分钟也是个平均值。 但是客户的原始值,其实就是1分钟内的平均值。所以客户的瞬时超限,其实是看不出来的。但是后端同事从实时监控里面可以看到超限的情况。 客户升带宽后, 发现还…

Flutter 应用数据持久化指南

1. 介绍 1.1 什么是数据持久化? 数据持久化是指将应用程序中的数据保存在持久存储介质(如硬盘、数据库等)中的过程。在计算机科学领域,持久化数据是指数据在程序退出或系统关机后仍然存在的能力。这种持久性使得数据可以在不同的…

是德科技keysight 33621A波形发生器

181/2461/8938产品概述: 与上一代DDS波形发生器相比,采用独家Trueform技术的安捷伦HP 33621A波形发生器具有更高的性能、保真度和灵活性。安捷伦HP 33621A 120 MHz、单通道、Trueform arbs,带时序控制和64 MSa存储器,1 ps抖动&am…

go juc 线程中的子类

1.go test() 主死随从 package mainimport ("fmt""strconv""time" )func test() {for i : 1; i < 10; i {fmt.Println("hello " strconv.Itoa(i))//阻塞time.Sleep(time.Second)} } func main() {//开启协程go test()for i : 1; …

如何配置vite的proxy

1.前言 vite项目&#xff0c;本地开发环境可以通过配置proxy代理实现跨域请求。但是生产环境&#xff0c;该配置不生效&#xff0c;一般使用 nginx 转发&#xff0c;或者后端配置cors 2.解释 server: {port: 9000,proxy: { // 本地开发环境通过代理实现跨域&#xff0c;生产…

基于ssm的轻型卡车零部件销售平台(java项目+文档+源码)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于ssm的轻型卡车零部件销售平台。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 项目简介&#xff1a; 轻型卡车零部件销售平台…

C# 批量删除Excel重复项

当从不同来源导入Excel数据时&#xff0c;可能存在重复的记录。为了确保数据的准确性&#xff0c;通常需要删除这些重复的行。 手动查找并删除可能会非常耗费时间&#xff0c;而通过编程脚本则可以实现在短时间内处理大量数据。本文将提供一个使用C# 快速查找并删除Excel重复项…

【ArduinoQuartus】在小脚丫STEP CYC10上安装PulseRain Reindeer并在软核上运行基础功能

【Arduino&Quartus】在小脚丫STEP CYC10上安装PulseRain Reindeer并在软核上运行基础功能 一、将Reindeer软核下载到STEP CYC10&#xff08;一&#xff09;下载PulseRain Reindeer软核&#xff08;二&#xff09;配置Reindeer软核到开发板1.将sof文件转换为jic文件2.将jic文…

Centos7安装单机版Kafka

下载 链接&#xff1a;https://pan.baidu.com/s/1W8lVEF6Y-xlg6zr3l9QAbg?pwdhbkt 提取码&#xff1a;hbkt 上传到服务器/opt目录 安装 # kafka安装目录为 /opt/kafka cd /opt; mkdir kafka; mv kafka_2.13-2.7.0.tgz ./kafka;cd kafka; #解压 tar -zxvf kafka_2.13-2.7.0…

说一说Redis的Bitmaps和HyperLoLog?

本篇内容对应 “Redis高级数据类型”小节 和 “7.5 网站数据统计”小节 对应视频&#xff1a; Redis高级数据结构 网站数据统计 1 什么是UV和DAU&#xff1f; DAUUV英文全称Daily Active UserUnique Visotr中文全称日活跃用户量独立访客如何统计数据通过用户ID排重统计数据通…

上位机图像处理和嵌入式模块部署(qmacvisual图像清晰度)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 做过isp的同学都知道&#xff0c;图像处理里面有一个3A&#xff0c;即自动曝光、自动白平衡和自动对焦。其中自动对焦这个&#xff0c;就需要用输入…

绩效考核存在合理性、公平性、客观性吗?

目录 一、绩效考核流于形式&#xff1a;没有实际考核过 二、考核结果的确定: 主管一人说了算 三、考核结果&#xff1a; 与绩效奖金挂钩吗&#xff1f; 四、考核的滥用&#xff1a;成为公司排挤迫使员工离职的手段 五、公司说&#xff1a; 让你滚蛋&#xff0c;谁还会发你奖…

SpringBoot(48)-使用 SkyWalking 进行分布式链路追踪

Spring Boot&#xff08;48&#xff09;- 使用 SkyWalking 进行分布式链路追踪 介绍 在分布式系统中&#xff0c;了解各个服务之间的调用关系和性能表现是非常重要的。SkyWalking 是一款开源的分布式系统监控与分析平台&#xff0c;能够帮助我们实现分布式系统的链路追踪、性…

使用minikube安装使用单机版K8S(docker)

前置&#xff1a;作为一个开发&#xff0c;工作之余想玩一下k8s&#xff0c;但是搭建成本太高&#xff0c;所以就找到了minikube这个工具&#xff0c;快速搭建单机版k8s&#xff0c;下面是个人搭建流程&#xff0c;基于centos7&#xff0c;仅供参考。 1.下载kubectl&#xff0…