Flask使用线程异步执行耗时任务

1 问题说明

1.1 任务简述

在开发Flask应用中一定会遇到执行耗时任务,但是Flask是轻量级的同步框架,即在单个请求时服务会阻被塞,直到任务完成(注意:当前请求被阻塞不会影响到其他请求)。

解决异步问题有两种思路,一种是借助外部工具实现异步,例如:消息队列(RabbitMQ)、 异步任务队列(Celery+Redis);另一种借助Python中的进程、线程或协程解决异步。我的是小项目选择因此选择了第二种方法。

经过测试,我在Flask中使用协程(gevent)会被阻塞;使用进程(multiprocessing)不会被阻塞,操作数据库出了问题(有可能是我没操作正确的问题);最后选择使用线程(threading)。

注意:使用线程会出现线程安全问题,

1.2 注意的问题

(1)线程安全

"""
!!! 注意
1 由于flask-SQLAlchemy对SQLAlchemy进行了封装,所以是线程安全的,可以在线程中直接使用;
2 原生SQLAlchemy中# 2.1 直接使用Session是线程不安全的,不推荐使用self.session = Session(self.engine)# 2.2 直接使用scoped_session是线程安全的,推荐使用# 获取sessionmakersession_factory = sessionmaker(engine)# scoped_session是线程安全的# 注意,此session不能使用Query对象session = scoped_session(session_factory)!!!
"""

(2)使用数据库

# 注意:高版本的Flask-SQLAlchemy(我的版本Flask-SQLAlchemy==3.1.1,SQLAlchemy==2.0.16)必须使用”with app.app_context()“,本质原因是Flask关联的SQLAlchemy版本太高
# 否则无法插入数据库并报错,错误内容如下(低版本不会出现此问题):
"""
This typically means that you attempted to use functionality that needed
the current application. To solve this, set up an application context
with app.app_context(). See the documentation for more information.
"""

2 工程布局

项目布局如下:
在这里插入图片描述

3 源代码

(1)main.py

from blueprint import init_blueprint
from config_app import app
from config_db import init_mysql_db# 初始化MySQL
init_mysql_db()init_blueprint()if __name__ == '__main__':app.run(host='0.0.0.0', debug=True)

(2)config_app.py

from flask import Flask
from flask_cors import CORSdef create_app():flask_app = Flask(__name__)CORS(flask_app, supports_credentials=True)return flask_appapp = create_app()

(3)config_db.py


from flask_sqlalchemy import SQLAlchemy
from flask_marshmallow import Marshmallow# 添加pymysql驱动,连接MySQL数据库
import pymysqlfrom config_app import apppymysql.install_as_MySQLdb()"""
!!! 注意
1 由于flask-SQLAlchemy对SQLAlchemy进行了封装,所以是线程安全的,可以在线程中直接使用;
2 原生SQLAlchemy中# 2.1 直接使用Session是线程不安全的,不推荐使用self.session = Session(self.engine)# 2.2 直接使用scoped_session是线程安全的,推荐使用# 获取sessionmakersession_factory = sessionmaker(engine)# scoped_session是线程安全的# 注意,此session不能使用Query对象session = scoped_session(session_factory)!!!
"""# 创建MySQL单实例
mysql_db = SQLAlchemy()# 创建Schema
mysql_schema = Marshmallow()# 创建数据库
"""
create database async default character set utf8mb4 collate utf8mb4_unicode_ci;
"""class MysqlConf:acc = "root"pwd = "123456"host = "192.168.108.200"port = 3306db = "async"mysql_conf = MysqlConf()# 初始化MySQL数据库
def init_mysql_db():# 配置MySQL数据库urldb_url = "mysql://" + mysql_conf.acc + ":" + mysql_conf.pwd + "@" + mysql_conf.host + ":" + str(mysql_conf.port) + "/" + mysql_conf.dbapp.config["SQLALCHEMY_DATABASE_URI"] = db_url# 关闭sqlalchemy自动跟踪数据库app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False# 显示底层执行的SQL语句app.config['SQLALCHEMY_ECHO'] = True# 解决‘No application found. Either work inside a view function or push an application context.’app.app_context().push()# 初始化appmysql_db.init_app(app)# 初始化schemamysql_schema.init_app(app)# 初始化table
def init_table():# 删除表mysql_db.drop_all()# 创建表mysql_db.create_all()

(4)dao.py

import datetimefrom config_db import mysql_db as db# Create table of bm_record
class User(db.Model):# Record table__tablename__ = "as_user"id = db.Column("us_id", db.Integer, nullable=False, primary_key=True, autoincrement=True)name = db.Column("us_name", db.String(100))age = db.Column("us_age", db.Integer)create_time = db.Column("us_create_time", db.DateTime, default=datetime.datetime.now)# 插入数据
def insert_record_dao(user: User):# Add datadb.session.add(user)# Commit datadb.session.commit()

(5)blueprint.py

# 构建蓝本
import threading
import time
from concurrent.futures import ThreadPoolExecutorfrom flask import Blueprint, jsonifyfrom config_app import app
from config_db import init_table
from dao import insert_record_dao, Useruser = Blueprint("user", __name__)# 注册蓝本
def init_blueprint():app.register_blueprint(user, url_prefix='/user')@user.route("/initdb")
def init_db():init_table()return jsonify("success")@user.route("/add")
def add_user():user: User = User()user.name = "zhangsan"user.age = 12time.sleep(10)insert_record_dao(user)return jsonify(user.id)executor = ThreadPoolExecutor(3)
@user.route("/thread_pool")
def run_task_thread_pool():executor.submit(_run_thread_pool)return jsonify("success")# 执行线程
def _run_thread_pool():print("Run thread pool")time.sleep(2)user: User = User()user.name = "thread pool"user.age = 12# 注意:必须使用”with app.app_context()“,否则无法插入数据库,并且不会报错with app.app_context():insert_record_dao(user)print("End thread pool")pass@user.route("/thread")
def run_task_thread():task = threading.Thread(target=_run_thread, name='thread-01')task.start()return jsonify("success")# 执行线程
def _run_thread():print("Run thread")time.sleep(2)user: User = User()user.name = "thread"user.age = 12# 注意:高版本的Flask-SQLAlchemy(我的版本Flask-SQLAlchemy==3.1.1,SQLAlchemy==2.0.16)必须使用”with app.app_context()“,本质原因是Flask关联的SQLAlchemy版本太高# 否则无法插入数据库并报错,错误内容如下(低版本不会出现此问题):"""This typically means that you attempted to use functionality that neededthe current application. To solve this, set up an application contextwith app.app_context(). See the documentation for more information."""with app.app_context():insert_record_dao(user)print("End thread")pass

4 请求截图

(1)初始化数据库

在这里插入图片描述

(2)添加用户

在这里插入图片描述

(3)使用线程池

在这里插入图片描述

(4)使用线程

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/190976.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

记录 | photoshop移动选区

期望达到的效果: 选择一块区域,并移动它 操作: (1) 选择矩形选框工具, (2) 对区域进行选取, (3) 选择移动工具, (4) 移动选区,效果如下,

已知数组A[1..n]中元素类型为非负整数,设计算法将其调整为左右两部分,左边所有为奇数,右边所有为偶数,并要求算法的时间复杂度为O(n)

//左边奇数右边偶数 void Swap(int* a, int* b) {int tmp *b;*b *a;*a tmp; } void LeftRight(int arr[],int n) {int i 0;int j n - 1;while(i<j){if (arr[i] % 2 0 && arr[j] % 2 1) {Swap(&arr[i], &arr[j]);i;j--;}else if (arr[i] % 2 1 &…

Next.js初步使用

文章目录 安装和运行页面静态文件 React初步&#xff0c;但不熟悉React也可以学习本文。 安装和运行 Next.js是一个基于React的服务端渲染框架&#xff0c;可以实现构建高性能、可扩展的React应用&#xff0c;提供了很多方便的工具和功能&#xff0c;包括自动代码分割、服务器…

Java研学-IO流(三)

六 字节流 – 字节输出流系列 OutPutStream体系 1 OutPutStream系列 – 字节输出流 // 表示字节输出流所有类的超类&#xff0c;输出流接受输出字节并将其发送到某个接收器 public abstract class OutputStreamFileOutputStream/BufferedOutputStream 2 FileOutputStream类设…

pyqt5使用pyqtgraph实现动态热力图

pyqt5使用pyqtgraph实现动态热力图 一、效果图 二、流程 1、打开Designer创建一个UI界面 2、把UI转成py 3、创建一个main.py文件 4、在main文件中渲染画布、创建初始数据、画热力图、创建更新数据线程、绑定按钮触发事件三、UI界面 其中h_map.py代码如下: # -*- coding: ut…

1688API接口系列,1688开放平台接口使用方案(商品详情数据+搜索商品列表+商家订单类)

1688商品详情接口是指1688平台提供的API接口&#xff0c;用于获取商品详情信息。通过该接口&#xff0c;您可以获取到商品的详细信息&#xff0c;包括商品标题、价格、库存、描述、图片等。 要使用1688商品详情接口&#xff0c;您需要先申请1688的API权限&#xff0c;并获取ac…

变量和引用

变量和引用 2.1.深入认识变量 2.1.1.什么是变量 变量是在程序中保存用户数据的一段内存存储空间&#xff0c;变量名是内存空间的首地址 变量三要素&#xff1a;名称、类型、值 2.1.2.变量的名称 组成: 字母、数字、下划线组成&#xff0c;不能以数字开头 变量名称的长…

蓝桥杯每日一题2023.12.2

题目描述 蓝桥杯大赛历届真题 - C 语言 B 组 - 蓝桥云课 (lanqiao.cn) 题目分析 答案&#xff1a;3598180 由题目分析可以知道&#xff0c;给小明发的牌一共有13种类型&#xff0c;每种类型的牌一共有四张。对于每种牌&#xff0c;我们都有5种选择&#xff0c;不拿、拿一张、…

LeetCode - 965. 单值二叉树(C语言,二叉树,配图)

二叉树每个节点都具有相同的值&#xff0c;我们就可以比较每个树的根节点与左右两个孩子节点的值是否相同&#xff0c;如果不同返回false&#xff0c;否则&#xff0c;返回true。 如果是叶子节点&#xff0c;不存在还孩子节点&#xff0c;则这个叶子节点为根的树是单值二叉树。…

分析实现HarmonyOS中的Linux内核架构模式

在当今的科技领域&#xff0c;操作系统是各种智能设备运行的关键所在。而在这方面&#xff0c;华为的鸿蒙系统备受瞩目。那么&#xff0c;鸿蒙系统技术架构是怎样的呢&#xff1f;本文将为您揭开这一神秘面纱。 首先&#xff0c;我们需要了解鸿蒙系统的基本架构。鸿蒙系统采用…

mysql中除了InnoDB以外的其它存储引擎

参考资料&#xff1a;https://dev.mysql.com/doc/refman/8.0/en/storage-engines.html MyISAM存储引擎 https://dev.mysql.com/doc/refman/8.0/en/myisam-storage-engine.html MyISAM 存储引擎是基于比较老的ISAM存储引擎&#xff08;ISAM已经不再可用&#xff09;&#xff…

shell编程系列(11)-使用grep查找文本

文章目录 前言grep的使用根据关键字查找反向查找 结语 前言 grep命令也是我们在日常使用linux&#xff0c;编写shell脚本中会用到的一个高频命令&#xff0c;grep主要是帮助我们查找我们想要的内容&#xff0c;类似于我们在office word里面的 Ctrl f 查找功能&#xff0c;但是…

SLURM资源调度管理系统REST API服务配置,基于slurm22.05.9,centos9stream默认版本

前面给大家将了一下slurm集群的简单配置&#xff0c;这里给大家再提升一下&#xff0c;配置slurm服务的restful的api&#xff0c;这样大家可以将slurm服务通过api整合到桌面或者网页端&#xff0c;通过桌面或者网页界面进行管理。 1、SLURM集群配置 这里请大家参考&#xff1…

使用C语言创建高性能爬虫ip网络

之前写的python和GO语言的爬虫ip池的文章引起很大反响&#xff0c;这次我将以C语言来创建爬虫IP池&#xff0c;但是因为其复杂性&#xff0c;可能代码并非完美。但是最终也达到的想要的效果。 因为在C语言中创建代理IP池可能会比较复杂&#xff0c;且C语言并没有像Python那样的…

07-原型模式-C语言实现

原型模式&#xff1a; Specify the kinds of objects to create using a prototypical instance,and create new objects by copying this prototype.&#xff08;用原型实例指定创建对象的种类&#xff0c; 并且通过拷贝这些原型创建新的对象。 &#xff09; UML图&#xff1…

关于 Kubernetes中Admission Controllers(准入控制器) 认知的一些笔记

写在前面 工作中遇到&#xff0c;简单整理记忆博文为官方文档整理涉及内置准入控制的分类理解理解不足小伙伴帮忙指正 人活着就是为了忍受摧残&#xff0c;一直到死&#xff0c;想明了这一点&#xff0c;一切事情都能泰然处之 —— 王小波《黄金时代》 为什么需要准入控制器 准…

非得让你会之MyBatis插件与Java动态代理

引言 咱们今天聊聊Java动态代理&#xff0c;这东西在开发中真的太常见了。比如Spring AOP、RPC&#xff0c;它们都离不开动态代理。然后&#xff0c;咱们再来说说MyBatis插件&#xff0c;这可是MyBatis框架中的一个超实用的功能&#xff0c;它就像是给MyBatis加了个“超能力”…

『测试基础』| 如何理解测试用例管理和缺陷管理?

『测试管理攻略』| 如何理解测试用例管理和缺陷管理&#xff1f; 1 测试用例定义2 测试用例设计原则3 测试用例的评审4 测试如何维护&#xff1f;5 用例的作用6 用例管理工具7 缺陷关注的重点8 缺陷分析9 缺陷管理工具 1 测试用例定义 测试用例&#xff08;TestCase&#xff0…

12.1平衡树(splay),旋转操作及代码

平衡树 变量定义 tot表示结点数量&#xff0c;rt表示根的编号 v[i]表示结点i的权值 fa[i]表示结点i的父亲节点 chi[i][2]表示结点i的左右孩子 cnt[i]表示结点i的权值存在数量&#xff0c;如1123&#xff0c;v[3]1&#xff0c;则cnt[3]2;就是说i3的三号结点的权值为1&…

树和二叉树的基本概念和堆的实现

树的概念及结构 树的概念 树是一种非线性的数据结构&#xff0c;它是由n&#xff08;n>0&#xff09;个有限结点组成一个具有层次关系的集合。把它叫做树是因为它看起来像一棵倒挂的树&#xff0c;也就是说它是根朝上&#xff0c;而叶朝下的。 1.有一个特殊的结点&#…