flask+celery处理异步任务

celery是一个强大的分布式任务队列,在这里我们介绍一下它最基本的处理异步任务的功能,包含以下几个:

  • 创建Celery实例

  • 创建一个异步任务

  • 查询异步任务的信息

  • 取消异步任务

使用的环境是flask3.0+Celery5.4

1. 创建Celery实例

celery通过中间件获取任务,再将任务结果写入backend,所以创建实例时需要传递这两个的url,其支持多种协议(比如redis、rabbitmq等),这里我们以redis为例。最简单的一种是创建对象时传递:

from celery import Celerycelery_app = Celery(__name__, broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')

另外一种是使用配置文件,按官方文档的例子,在同级目录创建一个celeryconfig.py文件,写入配置信息,然后更新celery对象的配置:

celery_app = Celery(__name__)
apcelery_appp.config_from_object('celeryconfig')

但可能是由于Celery版本问题,配置字段改变了(大小写改变),这种方法连接backend一直报错,暂时还没找到原因

2. 创建一个异步任务

类似于flask,celery也使用装饰器来创建任务处理函数,比如官方例子:

@celery_app.task
def add(x, y):return x + y

但这种方式无法访问任务实例的信息,需要修改一下装饰器的参数,将任务实例作为第一个参数引入:

@celery_app.task(bind=True)
def add(self, x, y):print(self.request.id) # 打印任务return x + y

返回值会写到backend,存储在result字段。结合flask的例子如下:

from flask import Flask# 其它代码
# ...app = Flask(__name__)@app.route('/task', methods=['POST'])
def add_task():id = add.delay(1, 2) # 返回的是异步任务的return {'code': 0, 'data': id}

注意:Celery是要单独启动的,然后再启动flask,启动命令类似于:

Celery -A app worker --loglevel=info

这个地方也有比较多的坑,经常报模块不存在这样的错

3. 查询异步任务的信息

大部分异步任务的耗时都比较长,过程中用户可能想知道任务执行的情况,比如进度等,这时前面引入的第一个参数(self)就能发挥作用了,我们修改一下demo, 每隔1秒计算一次,并将进度更新到backend:

import time@celery_app.task(bind=True)
def add(self, x, y):for i in range(1, 100):print(x + y)self.update_state(state="PROGRESS", meta={'index': i})time.sleep(1)return x + y

而对应的flask查询代码如下:

from celery.result import AsyncResult@app.route('/task/<string:id>', methods=['GET'])
def get_task(id):return {'code': 0, 'data': celery_app.AsyncResult(id)}

4. 取消异步任务

取消任务要借助Celery的Control类:

@app.route('/task/<string:id>', methods=['DELETE'])
def remove_task(id):ctrl = Control(celery_app) ctrl.revoke(id)return {'code': 0} 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/58797.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

巨好看的登录注册界面源码

展示效果 源码 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta http-equiv"X-UA-Compatible" content"IEedge" /><meta name"viewport" content"widthdevic…

Redis-发布/订阅交互模式

文章目录 一、消息代理介绍二、Redis中客户端、服务器之间的交互模式介绍三、Redis发布/订阅交互模式的操作 一、消息代理介绍 “消息代理”&#xff08;Message Broker&#xff09;是一种软件组件&#xff0c;它在不同的应用程序之间传递消息。在Redis的上下文中&#xff0c;…

利用Kubernetes原生特性实现简单的灰度发布和蓝绿发布

部分借鉴地址: https://support.huaweicloud.com/intl/zh-cn/bestpractice-cce/cce_bestpractice_10002.html 1.原理介绍 用户通常使用无状态负载 Deployment、有状态负载 StatefulSet等Kubernetes对象来部署业务&#xff0c;每个工作负载管理一组Pod。以Deployment为例&#x…

SpringBoot集成Shiro+Jwt+Redis

概述 首先需要知道为什么使用 ShiroJwtRedis 进行登录认证和权限控制。 1. 为什么用Shiro&#xff1f; 主要用的是 shiro 里面的登录认证和权限控制功能。 2. 为什么用Jwt&#xff1f; Shiro 默认的 Session 机制来帮助实现权限管理&#xff0c;用于维护用户的状态信息。而 …

Docker Compose --- 管理多容器应用

用于定义和运行多容器 Docker 应用程序。通过 Compose&#xff0c;用户可以使用 YAML 文件来配置应用程序的服务、网络和卷等资源 简化多容器的管理和部署过程 以下compose.yaml示例展示如何部署两个服务WordPress 和 MySQL的环境 version: 3.8 # 指定 Docker Compose 文件的…

18.04Ubuntu遇到Unable to locate package

解决办法&#xff1a; 要先升级你的apt Sudo apt-get update

Django ORM详解:外键使用(外键逻辑关联)与查询优化

Django数据库迁移 # 创建迁移 python manage.py makemigrations your_app_name # 应用迁移 python manage.py migrate # 查看迁移状态 python manage.py showmigrations # 回滚迁移 python manage.py migrate your_app_name 0001 # 修改表后,删除迁移记录和表删除迁移记录后重…

redis做缓存,mysql的数据怎么与redis进行同步(双写一致性)

基于业务做选择,强一致性和允许延迟再加消息队列 强一致性:当修改了数据库的数据同时更新缓存的数据,缓存和数据库的数据保持一致 读操作:缓存命中,直接返回数据,缓存没有命中,查询数据库,写入缓存,设定过期时间 写操作:延迟双删 :先删除缓存,修改数据库,等待延迟(数据库主从节…

《安全基石:等保测评的全方位解读》

在数字化转型的浪潮中&#xff0c;网络安全已成为企业生存与发展的核心议题。等保测评&#xff0c;作为我国网络安全等级保护制度的重要组成部分&#xff0c;不仅是企业安全的基石&#xff0c;更是推动企业高质量发展的关键。本文将全面解读等保测评的内涵、作用及其对企业的深…

(五)Spark大数据开发实战:灵活运用PySpark常用DataFrame API

目录 一、PySpark 二、数据介绍 三、PySpark大数据开发实战 1、数据文件上传HDFS 2、导入模块及数据 3、数据统计与分析 ①、计算演员参演电影数 ②、依次罗列电影番位前十的演员 ③、按照番位计算演员参演电影数 ④、求每位演员所有参演电影中的最早、最晚上映时间及…

一些CSS的基础知识点

写在前面 Cascading Style Sheets&#xff08;CSS&#xff09;是用于描述网页样式和布局的标记语言。它允许开发者将内容与表示分离&#xff0c;从而使得网页的设计和结构更加清晰和易于维护。本文将详细介绍CSS的基础知识点&#xff0c;帮助初学者快速掌握CSS的核心概念和应用…

SpringFactoriesLoader

1.什么是SPI (面试题) SPI全名Service Provider interface&#xff0c;翻译过来就是“服务提供接口”&#xff0c;再说简单就是提供某一个服务的接口&#xff0c; 提供给服务开发者或者服务生产商来进行实现。 Java SPI 是JDK内置的一种动态加载扩展点的实现。 这个机制在一…

Apifox 10月更新|测试步骤支持添加脚本和数据库操作、测试场景支持回收站、变量支持「秘密」类型

Apifox 新版本上线啦&#xff01; 看看本次版本更新主要涵盖的重点内容&#xff0c;有没有你所关注的功能特性&#xff1a; 自动化测试模块能力持续升级 测试步骤支持添加「脚本」和「数据库操作」 测试场景和定时任务支持回收站内恢复 定时任务支持设置以分钟频率运行 导入…

「C/C++」C++标准库之#include<fstream>文件流

✨博客主页何曾参静谧的博客&#x1f4cc;文章专栏「C/C」C/C程序设计&#x1f4da;全部专栏「VS」Visual Studio「C/C」C/C程序设计「UG/NX」BlockUI集合「Win」Windows程序设计「DSA」数据结构与算法「UG/NX」NX二次开发「QT」QT5程序设计「File」数据文件格式「PK」Parasoli…

liunx网络套接字 | 实现基于tcp协议的echo服务

前言&#xff1a;本节讲述linux网络下的tcp协议套接字相关内容。博主以实现tcp服务为主线&#xff0c;穿插一些小知识点。以先粗略实现&#xff0c;后精雕细琢为思路讲述实现服务的过程。下面开始我们的学习吧。 ps&#xff1a;本节内容建议了解网络端口号的友友们观看哦。 目录…

第十六课 Vue中的组件

Vue中的组件 Vue中可以自定义模板组件&#xff0c;组件的写法有很多种 组件写法 1&#xff09;在components上拓展组件 <div id"app"><test></test></div><script>new Vue({el:#app,components: {test: {template: <h1>这是一…

nmcli、ip、ifcfg配置网络区分方法

文章目录 一、检查NetworkManager状态使用nmcli命令&#xff1a;检查NetworkManager服务状态&#xff1a; 二、检查ip命令的使用三、检查ifcfg文件查看/etc/sysconfig/network-scripts/目录&#xff1a;查看/etc/network/interfaces文件&#xff08;针对Debian系&#xff09;&a…

redis详细教程(5.AOP和RDB持久化)

AOF&#xff08;Append Only File&#xff09;日志和RDB&#xff08;Redis Database Backup&#xff09;持久化是Redis中两种重要的数据持久化机制。 RDB持久化机制原理RDB是Redis提供的一种数据快照保存机制&#xff0c;它将某个时间点的数据库状态保存到一个RDB文件中。这个…

uni-app 运行HarmonyOS项目

1. uni-app 运行HarmonyOS项目 文档中心 1.1. HarmonyOS端 1.1.1. 准备工作 &#xff08;1&#xff09;下载DevEco Studio开发工具。   &#xff08;2&#xff09;在 DevEco Studio 中打开任意一个项目&#xff08;也可以新建一个空项目&#xff09;。   &#xff08;3&…

WPF+MVVM案例实战(十三)- 封装一个自定义消息弹窗控件(上)

文章目录 1、案例效果2、功能实现1、创建文件2、资源文件获取3、枚举实现3、弹窗实现1、界面样式实现2、功能代码实现4、总结1、案例效果 2、功能实现 1、创建文件 打开 Wpf_Examples 项目,我们在用户控件类库中创建一个窗体文件 SMessageBox.xaml,同时创建枚举文件夹 Enum…