Flask框架配置celery-[1]:flask工厂模式集成使用celery,可在异步任务中使用flask应用上下文,即拿即用,无需更多配置

一、概述

1、celery框架和flask框架在运行时,是在不同的进程中,资源是独占的。

2、celery异步任务如果想使用flask中的功能,如orm,是需要在flask应用上下文管理器中执行orm操作的

3、使用celery是需要使用到中间件的,简单点就使用redis做中间件

注意:

在flask工厂模式集成celery异步框架,在celery的异步任务中能够获取到flask的应用上下文管理器,也就是说在celery异步任务中你可以去调用flask项目中功能,如orm操作等。

使用本文配置,可以无需修改flask创建app应用的程序,直接将celery相关包创建,运行就可以使用,且能够在异步任务使用flask的功能。

二、项目结构

依赖环境:

celery==4.4.7
eventlet==0.33.3
Flask==2.1.3
Flask-Caching==1.10.1
Flask-Cors==3.0.10
Flask-Migrate==2.7.0
Flask-RESTful==0.3.9
Flask-SocketIO==5.1.1
Flask-SQLAlchemy==2.5.1
PyMySQL==1.0.2
redis==3.5.3
SQLAlchemy==1.4.0
Werkzeug==2.0.2

目录结构:

flask-project

        |--apps

                |-- user

                        |-- models

                        |--views.py

                        |--urls.py

                |--__init__.py

        |--ext

                |--__init__.py

                |--config.py

        |--celery_task

                |--__init__.py

                |--async_task.py

                |--celery.py

                |--celeryconfig.py

                |--check_task.py

                |--scheduler_task.py

        app.py

三、flask工厂模式下各模块功能

1、apps/user/models.py : 写了一个user表

2、apps/user/views.py:写了测试调用celery异步任务的接口

3、apps/user/urls.py: 注册路由的

4、ext/__init__.py:cache、db、cors的拓展

5、ext/config.py : cache和cors使用到的配置

6、apps/__init__.py: 一个函数create_app,生成flask应用对象

7、app.py: 启动flask应用对象的模块

本文重点不在flask工厂模式,默认看官都懂如何创建flaks工厂模式的项目了。

在视图中在执行异步任务,并获取异步任务的id:

from celery_task.async_task import send_email_task,cache_user_task
#用户资源:get\put\delete, 对单个进行操作
class UserOneResource(ResourceBase):def put(self,id):#测试异步发邮件email = request.args.get('email')code = request.args.get('code')res = send_email_task.delay(email,code)print(res.id)return NewResponse(msg='put',data={'task_id':res.id})def patch(self,id):#测试异步操作flask的orm和cachep = request.args.get('p')if p=='set':res = cache_user_task.delay()print(res,type(res))return NewResponse(msg='patch',data={'task_id':res.id})else:from ext import cachedata = cache.get('all-user-data')return NewResponse(msg='patch',data=data)

res = 异步函数.delay(函数需要的参数)

task_id = res.id

注意:task_id 可以知道对应的任务的完成情况,获取任务的返回值等。

四、celery项目的配置

1、celery的配置

将celery的配置都放到一个py文件中,方便后期的维护和使用

celeryconfig.py

from celery.schedules import crontab
from datetime import timedelta
'''
参数解析:
accept_content:允许的内容类型/序列化程序的白名单,如果收到不在此列表中的消息,则该消息将被丢弃并出现错误,默认只为json;
task_serializer:标识要使用的默认序列化方法的字符串,默认值为json;
result_serializer:结果序列化格式,默认值为json;
timezone:配置Celery以使用自定义时区;
enable_utc:启用消息中的日期和时间,将转换为使用 UTC 时区,与timezone连用,当设置为 false 时,将使用系统本地时区。
result_expires: 异步任务结果存活时长
beat_schedule:设置定时任务
'''
#手动注册celery的异步任务:将所有celery异步任务所在的模块找到,写成字符串
task_module = ['celery_task.async_task',  # 写任务模块导入路径,该模块主要写异步任务的方法'celery_task.scheduler_task',  # 写任务模块导入路径,该模块主要写定时任务的方法
]#celery的配置
config = {"broker_url" :'redis://127.0.0.1:6379/0',   #'redis://:123456@127.0.0.1:6379/1' 有密码时,123456是密码"result_backend" : 'redis://127.0.0.1:6379/1',"task_serializer" : 'json',"result_serializer" : 'json',"accept_content" : ['json'],"timezone" : 'Asia/Shanghai',"enable_utc" : False,"result_expires" : 1*60*60,"beat_schedule" : { #定时任务配置# 名字随意命名'add-func-30-seconds': {# 执行add_task下的addy函数'task': 'celery_task.scheduler_task.add_func',  # 任务函数的导入路径,from celery_task.scheduler_task import add_func# 每10秒执行一次'schedule': timedelta(seconds=30),# add函数传递的参数'args': (10, 21)},# 名字随意起'add-func-5-minutes': {'task': 'celery_task.scheduler_task.add_func',  # 任务函数的导入路径,from celery_task.scheduler_task import add_func# crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务'schedule': crontab(minute='5'),  # 之前时间点执行,每小时的第5分钟执行任务, 改成小时,分钟,秒 就是每天的哪个小时哪分钟哪秒钟执行'args': (19, 22)  # 定时任务需要的参数},# 缓存用户数据到cache中'cache-user-func': {'task': 'celery_task.scheduler_task.cache_user_func',# 导入任务函数:from celery_task.scheduler_task import cache_user_func'schedule': timedelta(minutes=1),  # 每1分钟执行一次,将用户消息缓存到cache中}}
}

2、创建celery对象

celery.py

from celery import Celery,Task
from .celeryconfig import config,task_module
import sys
import os
'1、把flask项目路径添加到系统环境变量中'
project_path = os.path.dirname(os.path.dirname(__file__))
sys.path.append(project_path)'''
2、创建celery应用对象'task'可以任务是该celery对象名字,用于区分celery对象broker是指定消息中间件backend是指定任务结果存储位置include是手动指定异步任务所在的模块的位置
'''
#创建celery异步对象
celery = Celery('task', broker=config.get('broker_url'), backend=config.get('result_backend'), include=task_module)
#导入一些基本配置
celery.conf.update(**config)'3、给celery所有任务添加flask的应用上下文,在celery异步任务中就可以调用flask中的对象了'
class ContextTask(celery.Task):def __call__(self, *args, **kwargs):from apps import create_appapp = create_app()with app.app_context():return self.run(*args, **kwargs)
celery.Task = ContextTask

注意:

1、第一步很关键,设置到python项目运行时,加载环境变量的问题。这一步是将flask项目的根目录加载环境变量中,这样第3步才能从apps中导入create_app函数。

2、第二步是创建celery通用的方法了,没什么好说的。

3、第三步很关键,涉及到celery异步任务能否在flask应用上下文管理器运行,从而可以调用flask中的功能,例如orm操作,cache操作.。(在执行任务时,先套上flask的应用上下文管理器)

3、异步任务模块

将所有异步任务相关的函数都集中到一个模块中,方便维护和使用。

async_task.py

# 导入celery对象app
from celery_task.celery import celery
from ext import cache
import time'''
1、没有返回值的,@app.task(ignore_result=True)
2、有返回值的任务,@app.task 默认就是(ignore_result=False)
'''# 没有返回值,禁用掉结果后端
@celery.task
def send_email_task(receiver_email,code):  # 此时可以直接传邮箱,还能减少一次数据库的IO操作''':param email: 接收消息的邮箱,用户的邮箱:return:'''# 模拟邮件发送验证码time.sleep(5)return {'result':'邮件已经发送',receiver_email:'2356'}@celery.task
def cache_user_task():#orm查询数据,放到cache中from apps.user.models import UserModeluser = UserModel.query.all()lis = []for u in user:id = u.idname = u.namedic = {'id':id,'name':name}lis.append(dic)print(dic)cache.set('all-user-data',lis)return {'code':200,'msg':'查询数据成功'}

4、定时任务模块

将所有定时任务相关的函数都集中到一个模块中,方便维护和使用。

schedulser_task.py

from celery_task.celery import celery
import time# 有返回值,返回值可以从结果后端中获取
@celery.task
def add_func(a, b):print('执行了加法函数',a+b)return a + b# 不需要返回值,禁用掉结果后端
@celery.task(ignore_result=True)
def cache_user_func():print('all')

5、检测任务id获取任务状态和返回值

check_task.py:

from celery.result import AsyncResult
from celery_task.celery import celery'''验证任务的执行状态的'''def check_task_status(task_id):'''任务的执行状态:PENDING :等待执行STARTED :开始执行RETRY   :重新尝试执行SUCCESS :执行成功FAILURE :执行失败:param task_id::return:'''result = AsyncResult(id=task_id, app=celery)dic = {'type': result.status,'msg': '','data': None,'code': 400}if result.status == 'PENDING':dic['msg'] = '任务等待中'elif result.status == 'STARTED':dic['msg'] = '任务开始执行'elif result.status == 'RETRY':dic['msg'] = '任务重新尝试执行'elif result.status == 'FAILURE':dic['msg'] = '任务执行失败了'elif result.status == 'SUCCESS':result = result.get()dic['msg'] = '任务执行成功'dic['data'] = resultdic['code'] = 200# result.forget() # 将结果删除# async.revoke(terminate=True)  # 无论现在是什么时候,都要终止# async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。return dic

在视图函数中调用该方法,通过task_id ,返回任务的运行结果。

五、测试

1、运行项目

flask项目(在项目根目录下执行):

        flask run --host 0.0.0.0 --port 5000

celery项目(在项目根目录下执行):

启动celery进程:

windows系统:

        celery -A celery_task.celery worker -l info  -P  eventlet

linux系统:

        celery -A celery_task.celery worker -l info 

启动定时任务(先启动celery进程在启动定时任务):

celery -A celery_task.celery beat -l info

2、运行结果

1、执行异步任务中,将orm数据存到cache中

2、执行定时任务了

六、注意事项

1、在系统中要先安装好redis和mysql,并都启动了

2、在测试异步操作orm时,会使用到flask的cache存数据,注意flask的cache不能配置内存模式,不然celery进程存到cache中的数据,flask进程中取不到的。

3、当前的配置下,celery的目录必须是在flask根目录下

七、拓展-改变celery_task的位置

如果你想将celery_task包移动到apps包下,此时你需要修改什么?

1、apps/celery_task/celery.py:将flask项目根目录加载到系统环境变量中的路径有变

'1、把flask项目路径添加到系统环境变量中'
project_path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))

2、apps/celery_task/celeryconfig.py: 注册异步任务的模块,定时任务的模块的位置变化

'1、加上apps.'
task_module = ['apps.celery_task.async_task',  # 写任务模块导入路径,该模块主要写异步任务的方法'apps.celery_task.scheduler_task',  # 写任务模块导入路径,该模块主要写定时任务的方法
]'2、task参数对应的字符串,加上apps.'
config = {"broker_url" :'redis://127.0.0.1:6379/0',   #'redis://:123456@127.0.0.1:6379/1' 有密码时,123456是密码"result_backend" : 'redis://127.0.0.1:6379/1',"task_serializer" : 'json',"result_serializer" : 'json',"accept_content" : ['json'],"timezone" : 'Asia/Shanghai',"enable_utc" : False,"result_expires" : 1*60*60,"beat_schedule" : { #定时任务配置# 名字随意命名'add-func-30-seconds': {# 执行add_task下的addy函数'task': 'apps.celery_task.scheduler_task.add_func',  # 任务函数的导入路径,from celery_task.scheduler_task import add_func# 每10秒执行一次'schedule': timedelta(seconds=30),# add函数传递的参数'args': (10, 21)},# 名字随意起'add-func-5-minutes': {'task': 'apps.celery_task.scheduler_task.add_func',  # 任务函数的导入路径,from celery_task.scheduler_task import add_func# crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务'schedule': crontab(minute='5'),  # 之前时间点执行,每小时的第5分钟执行任务, 改成小时,分钟,秒 就是每天的哪个小时哪分钟哪秒钟执行'args': (19, 22)  # 定时任务需要的参数},# 缓存用户数据到cache中'cache-user-func': {'task': 'apps.celery_task.scheduler_task.cache_user_func',# 导入任务函数:from celery_task.scheduler_task import cache_user_func'schedule': timedelta(minutes=1),  # 每1分钟执行一次,将用户消息缓存到cache中}}
}

3、在视图函数导入异步任务的路径也变了

#异步任务
from apps.celery_task.async_task import send_email_task,cache_user_task

4、启动celery和定时任务的命令变量【在项目根目录下执行命令】

启动celery:

windows启动命令: celery  -A  apps.celery_task.celery worker -l info  -P  eventlet

linux启动命令: celery  -A  apps.celery_task.celery worker -l info 

启动定时任务:

celery -A apps.celery_task beat -l info

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/107777.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

内容分发网络CDN分布式部署真的可以加速吗?原理是什么?

Cdn快不快?她为什么会快?同样的带宽为什么她会快?原理究竟是什么,同学们本着普及知识的想法,我了解的不是很深入,适合小白来看我的帖子,如果您是大佬还请您指正错误的地方,先谢谢大佬…

nodejs基于vue网上考勤系统

本网上考勤系统是针对目前考勤的实际需求, 采用计算机系统来管理信息,取代人工管理模式,查询便利,信息准确率高,节省了开支,提高了工作的效率。 本网上考勤系统主要包括个人中心、员工请假管理、员工考勤管…

asp.net酒店管理系统VS开发sqlserver数据库web结构c#编程Microsoft Visual Studio

一、源码特点 asp.net酒店管理系统是一套完善的web设计管理系统,系统具有完整的源代码和数据库,系统主要采用B/S模式开发。开发环境为vs2010,数据库为sqlserver2008,使用c#语言开发 asp.net 酒店管理系统1 二、功能介绍 …

如何使用FME开发自动化分析报告功能

目录 前言 一、使用的技术栈 二、技术难点解析 1.专题图 2.WORD文档实现 2.1 动态标题 2.3动态表格和文本 2.3专题图插入 三、完成NewGIS部署 四、模板总览图 总结 前言 一个标准项目分析报告需要需要包括3个方面: 文本叙述,主要体现在对某项专项数据的…

Radius OTP完成堡垒机登录认证 安当加密

Radius OTP(One-Time Password)是一种用于身份验证的协议,它通过向用户发送一个一次性密码来验证用户的身份。使用Radius OTP可以实现堡垒机登录,以下是一些实现步骤: 1、安装Radius服务器 首先需要安装Radius服务器…

数字化转型“同群效应”(2000-2022年)

参照霍春辉等(2023)的做法,团队对上市公司-数字化转型“同群效应”进行测算。将同行业、同省的其他企业定义为同群企业,并以该群体数字化转型程度均值、中位数作为衡量 一、数据介绍 数据名称:数字化转型“同群效应”…

c++视觉检测------Shi-Tomasi 角点检测

Shi-Tomasi 角点检测 :goodFeaturesToTrack() goodFeaturesToTrack() 函数是 OpenCV 中用于角点检测的功能函数。它的主要作用是检测图像中的良好特征点,通常用于计算机视觉任务中的光流估算、目标跟踪等。 函数签名: void goodFeaturesTo…

构建高性能物联网数据平台:EMQX和CnosDB的完整教程

CnosDB 是一款高性能、高压缩率、高易用性的开源分布式时序数据库。主要应用场景为物联网、工业互联网、车联网和IT运维。所有代码均已在GitHub开源。本文将介绍如何使用EMQX 这一MQTT 服务器 CnosDB 构建物联网数据平台,实现物联网数据的实时流处理。 前言 在物联…

使用paddleX体验

首先paddlex的网址链接是:飞桨AI Studio星河社区-人工智能学习与实训社区 (baidu.com) 进入paddlex以后的界面如下所示: 首先说明paddlex的作用是: PaddleX是PaddlePaddle深度学习框架的一个扩展库,专注于为深度学习任务提供强大…

ITextRenderer将PDF转换为HTML详细教程

引入依赖 <dependency><groupId>org.xhtmlrenderer</groupId><artifactId>flying-saucer-pdf-itext5</artifactId><version>9.1.18</version></dependency> 问题一&#xff1a;输出中文字体 下载字体simsun.ttc 下载链接&am…

使用html2canvas将html转pdf,由于table表的水平和竖直有滚动条导致显示不全(或者有空白)

结果&#xff1a; 业务&#xff1a;将页面右侧的table打印成想要的格式的pdf&#xff0c;首先遇到的问题是table表上下左右都有滚轮而html2canvas相当于屏幕截图&#xff0c;那滚动区域如何显示出来是个问题&#xff1f; gif有点模糊&#xff0c;但是大致功能可以看出 可复制…

互联网Java工程师面试题·Java 总结篇·第三弹

20、重载&#xff08;Overload&#xff09;和重写&#xff08;Override&#xff09;的区别。重载的方法能否根据返回类型进行区分&#xff1f; 方法的重载和重写都是实现多态的方式&#xff0c;区别在于前者实现的是编译时的多态性&#xff0c;而后者实现的是运行时的多态性。重…

SQL Server向表中插入数据

SQL Server向表中插入数据 切换到对应的数据库 use DBTEST插入数据 方式1 insert into 表名&#xff08;列名1,列名2) values&#xff08;数据1&#xff0c;数据2&#xff09;注意&#xff1a; 列名就算是字符类型也不用加引号&#xff0c;数据如果对应的字段是字符串类型&…

图像语义分割 pytorch复现U2Net图像分割网络详解

图像语义分割 pytorch复现U2Net图像分割网络详解 1、U2Net网络模型结构2、block模块结构解析RSU-7模块RSU-4Fsaliency map fusion module U2Net网络结构详细参数配置RSU模块代码实现RSU4F模块代码实现u2net_full与u2net_lite模型配置函数U2Net网络整体定义类损失函数计算评价指…

Unity之ShaderGraph如何实现上下溶解

前言 我们经常在电影中见到的一个物体或者人物&#xff0c;从头上到脚下&#xff0c;慢慢消失的效果&#xff0c;我么今天就来体验一下这个上下溶解。 主要节点 Position节点&#xff1a;提供对网格顶点或片段的Position 的访问 Step节点&#xff1a;如果输入In的值大于或等…

福昕阅读器打开pdf文档时显示的标题不是文件名

0 Preface/Foreword 1 现象 文件名为&#xff1a;Demo-20231017 打开效果&#xff1a;显示名字为 word template 2 解决方法 2.1 利用打印方式将word生产pdf 在word生产pdf文件时&#xff0c;使用打印方式生成pdf文档。 2.2 删除word文档设置的标题 文件---》信息---》标…

.NET Core/.NET6 使用DbContext 连接数据库,SqlServer

安装以下NuGet包 Microsoft.EntityFrameworkCore.SqlServer&#xff1a;SQL server 需要添加包 Microsoft.EntityFrameworkCore.Tools Newtonsoft.Json&#xff1a;用于Json格式转换 创建一个实体类来表示数据库表。在项目中创建一个名为Customer.cs的文件&#xff0c;并添加以…

微信小程序--小程序框架

目录 前言&#xff1a; 一.框架基本介绍 1.整体结构&#xff1a; 2.页面结构&#xff1a; 3.生命周期&#xff1a; 4.事件系统&#xff1a; 5.数据绑定&#xff1a; 6.组件系统&#xff1a; 7.API&#xff1a; 8.路由&#xff1a; 9.模块化&#xff1a; 10.全局配置&…

运维 | 如何在 Linux 系统中删除软链接 | Linux

运维 | 如何在 Linux 系统中删除软链接 | Linux 介绍 在 Linux 中&#xff0c;符号链接&#xff08;symbolic link&#xff0c;或者symlink&#xff09;也称为软链接&#xff0c;是一种特殊类型的文件&#xff0c;用作指向另一个文件的快捷方式。 使用方法 我们可以使用 ln…

[C国演义] 第十五章

第十五章 最长湍流子数组环绕字符串中唯⼀的⼦字符串 最长湍流子数组 力扣链接 子数组 ⇒ dp[i]的含义: 以arr[i] 结尾的所有子数组中的最长湍流子数组的长度 子数组 ⇒ 状态转移方程根据 最后一个位置来划分&#x1f447;&#x1f447;&#x1f447; 初始化: 都初始化为…