Flask和mysql多线程_Flask解析(二):Flask-Sqlalchemy与多线程、多进程

Sqlalchemy

flask-sqlalchemy的session是线程安全的,但在多进程环境下,要确保派生子进程时,父进程不存在任何的数据库连接,可以通过调用db.get_engine(app=app).dispose()来手动销毁已经创建的engine,然后再派生子进程。

最近线上的项目总是会报出数据库连接相关的错误,比如“Command out of Sync”,“Mysql server has gone away”,“Lost databse connection”,“Package sequence out of order”等等,最终解决下来,发现以上错误可以分为两种,一种是和连接丢失有关的,一种是和连接被多个线程(进程)同时使用了有关。

我们项目基于flask,有多线程的场景,也有多进程的场景。orm用的是flask的拓展flask-sqlalchemy。flask-sqlalchemy的使用必须基于flask的app实例,也就是说要在app上下文中才能使用flask-sqlalchemy,所以在某些离线(非web)场景下,我们也用到了原生的Sqlalchemy。

原生的Sqlalchemy的使用方式是

engine = create_engine(db_url)

Session = sessionmaker(bind=engine)

session = Session()

session.query(xxx)

首先要创建一个engine,engine顾名思义就是和数据库连接的引擎。在实际发起查询前,是不会创建任何connection的。创建engine时可以通过指定poolclass参数来指定engine使用的连接池。默认是QueuePool,也可以设置为NullPool(不使用连接池)。为了方便理解,可以把engine视为管理连接池的对象。

sqlalchemy中session和我们平时数据库里说的session是两个不同的概念,在平时数据库中,session的生命周期从连接上数据库开始,到断开和数据库的连接位置。但是sqlalchemy中的session更多的是一种管理连接的对象,它从连接池取出一个连接,使用连接,然后释放连接,而自身也跟随着销毁。sqlalchemy中的Connection对象是管理真正数据库连接的对象,真正的数据库连接在sqlalchemy中是DBAPI。

默认地,如果不传入poolclass,则使用QueuePool(具有一定数量的连接池),如果不指定pool_recycle参数,则默认数据库连接不会刷新。也就是说连接如果不适用,则一直不去刷新它。但是问题来了,在Mysql中,输入“show variables like "%timeout%"; ” ,可以看到有一个waittimeout,还有interacttimeout,默认值为28800(8小时),这两个值代表着,如果8个小时内某个数据库连接都不和mysql联系,那么就会断掉这个连接。所以,8个小时过去了,Mysql把连接断掉了,但是sqlalchemy客户端这边却还保持着这个连接。当某个时候该连接从连接池被取出使用时,就会抛出“Mysql server has gone away”等连接丢失的信息。

解决这个问题的办法很简单,只要传入pool_recycle参数即可。特别地,在flask-sqlalchemy中不会出现这种问题,因为falsk-sqlalchemy拓展自动地帮我们注入了pool_recycle参数,默认为7200秒。

def apply_driver_hacks(self, app, sa_url, options):

"""This method is called before engine creation and used to inject

driver specific hacks into the options. The `options` parameter is

a dictionary of keyword arguments that will then be used to call

the :func:`sqlalchemy.create_engine` function.

The default implementation provides some saner defaults for things

like pool sizes for MySQL and sqlite. Also it injects the setting of

`SQLALCHEMY_NATIVE_UNICODE`.

"""

if sa_url.drivername.startswith('mysql'):

sa_url.query.setdefault('charset', 'utf8')

if sa_url.drivername != 'mysql+gaerdbms':

options.setdefault('pool_size', 10)

options.setdefault('pool_recycle', 7200)  # 默认7200秒刷新连接

elif sa_url.drivername == 'sqlite':

pool_size = options.get('pool_size')

detected_in_memory = False

if sa_url.database in (None, '', ':memory:'):

detected_in_memory = True

from sqlalchemy.pool import StaticPool

options['poolclass'] = StaticPool

if 'connect_args' not in options:

options['connect_args'] = {}

options['connect_args']['check_same_thread'] = False

# we go to memory and the pool size was explicitly set

# to 0 which is fail. Let the user know that

if pool_size == 0:

raise RuntimeError('SQLite in memory database with an '

'empty queue not possible due to data '

'loss.')

# if pool size is None or explicitly set to 0 we assume the

# user did not want a queue for this sqlite connection and

# hook in the null pool.

elif not pool_size:

from sqlalchemy.pool import NullPool

options['poolclass'] = NullPool

# if it's not an in memory database we make the path absolute.

if not detected_in_memory:

sa_url.database = os.path.join(app.root_path, sa_url.database)

unu = app.config['SQLALCHEMY_NATIVE_UNICODE']

if unu is None:

unu = self.use_native_unicode

if not unu:

options['use_native_unicode'] = False

if app.config['SQLALCHEMY_NATIVE_UNICODE'] is not None:

warnings.warn(

"The 'SQLALCHEMY_NATIVE_UNICODE' config option is deprecated and will be removed in"

" v3.0. Use 'SQLALCHEMY_ENGINE_OPTIONS' instead.",

DeprecationWarning

)

if not self.use_native_unicode:

warnings.warn(

"'use_native_unicode' is deprecated and will be removed in v3.0."

" Use the 'engine_options' parameter instead.",

DeprecationWarning

)

sessionmaker是Session定制方法,我们把engine传入sessionmaker中,就可以得到一个session工厂,通过工厂来生产真正的session对象。但是这种生产出来的session是线程不安全的,sqlalchemy提供了scoped_session来帮助我们生产线程安全的session,原理类似于Local,就是代理session,通过线程的id来找到真正属于本线程的session。

flask-sqlalchemy就是使用了scoped_session来保证线程安全,具体的代码可以在Sqlalchemy中看到,构造session时,使用了scoped_session。

def create_scoped_session(self, options=None):

"""Create a :class:`~sqlalchemy.orm.scoping.scoped_session`

on the factory from :meth:`create_session`.

An extra key ``'scopefunc'`` can be set on the ``options`` dict to

specify a custom scope function. If it's not provided, Flask's app

context stack identity is used. This will ensure that sessions are

created and removed with the request/response cycle, and should be fine

in most cases.

:param options: dict of keyword arguments passed to session class in

``create_session``

"""

if options is None:

options = {}

scopefunc = options.pop('scopefunc', _app_ctx_stack.__ident_func__)

options.setdefault('query_cls', self.Query)

return orm.scoped_session(

self.create_session(options), scopefunc=scopefunc

)

def create_session(self, options):

"""Create the session factory used by :meth:`create_scoped_session`.

The factory **must** return an object that SQLAlchemy recognizes as a session,

or registering session events may raise an exception.

Valid factories include a :class:`~sqlalchemy.orm.session.Session`

class or a :class:`~sqlalchemy.orm.session.sessionmaker`.

The default implementation creates a ``sessionmaker`` for :class:`SignallingSession`.

:param options: dict of keyword arguments passed to session class

"""

return orm.sessionmaker(class_=SignallingSession, db=self, **options)

多进程和数据库连接

多进程环境下,要注意和数据库连接相关的操作。

说到多进程,python里最常用的就是multiprocessing。multiprocessing在windows下和linux的表现有所区别,在此只讨论linux下的表现。linux下多进程通过fork()来派生,要理解我下面说的必须先弄懂fork()是什么东西。粗略地说,每个进程都有自己的一个空间,称为进程空间,每个进程的进程空间都是独立的,进程与进程之间互不干扰。fork()的作用,就是将一个进程的进程空间,完完全全地copy一份,copy出来的就是子进程了,所以我们说子进程和父进程有着一模一样的地址空间。地址空间就是进程运行的空间,这空间里会有进程已经打开的文件描述符,文件描述符会间接地指向进程已经打开的文件。也就是说,fork()之后,父进程,子进程会有相同的文件描述符,指向相同的一个文件。为什么?因为文件是存在硬盘里的,fork()时copy的内存中的进程空间,并没有把文件也copy一份。这就导致了,父进程,子进程,同时指向同一个文件,他们任意一个都可以对这个文件进行操作。这和本文说的数据库有啥关系?顺着这个思路想,数据库连接是不是一个TCP连接?TCP连接是不是一个socket?socket在linux下是什么,就是一个文件。所以说,如果父进程在fork()之前打开了数据库连接,那么子进程也会拥有这个打开的连接。

两个进程同时写一个连接会导致数据混乱,所以会出现“Command out of sync”的错误,两个进程同时读一个连接,会导致一个进程读到了,另一个没读到,就是“No result”。一个进程关闭了连接,另一个进程并不知道,它试图去操作连接时,就会出现“Lost database connection”的错误。

在此讨论的场景是,父进程在派生子进程之前,父进程拥有已打开的数据库连接。派生出子进程之后,子进程也就拥有了相应的连接。如果在fork()之前父进程没有打开数据库连接,那么也不用担心这个问题。比如Celery使用的prefork池,虽然是多进程模型,但是celery在派子进程前时不会打开数据库连接的,所以不用担心在celery任务中会出现数据库连接混乱的问题。

我做的项目里的多进程的场景之一就是使用tornado来跑web应用,在派生多个web应用实例时,确保此前创建的数据库连接被销毁。

app = Flask()

db = Sqlalchemy()

db.init_app(app)

...

...

db.get_engine(app=app).dispose()  # 先销毁已有的engine,确保父进程没有数据库连接

...

...

fork() # 派生子进程

# 例如

tornado.start()  # 启动多个web实例进程

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/367163.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深入理解redis数据类型

转载请注明出处:https://www.cnblogs.com/wenjunwei/p/9720033.html redis的存储模型 redis不是普通的键值对存储,它实际上是一个数据结构存储服务器,可以支持不同类型的值。这意味着redis相比传统键值对字符串key和字符串value存储来说&…

centos7 登陆报错 grep:write error

出现这个原因是因为磁盘空间满了 通过df -h查看存储空间 发现磁盘空间满了,可以用 find / -type f -size 1000M 查找大于1000M的文件删除 然后找到用rm -rf 命令删除 然后就不会出现这个问题了!转载于:https://www.cnblogs.com/lxs1314/p/8961113.html

你敢在post和get上刁难我,就别怪我装逼了

> 掘金编辑提醒:本文疑似有误,参考 听说「99% 的人都理解错了 HTTP 中 GET 与 POST 的区别」 之前好几次面试都被问到post和get有什么区别,肯定很多同学和我一样说了一大堆什么post比get安全,get比post传的少乱起八糟这样的答案…

昂首阔步:让开发人员喜欢使用您的REST API

随着JAX-RS API的发展,以及今年早些时候在JSR-339下发布的2.0版本,使用出色的Java平台创建REST服务变得更加容易。 但是,极大的简化带来了巨大的责任:记录所有这些API,以便其他开发人员可以快速了解如何使用它们。 不…

thinkphp mysql 更新_THINKPHP5修改数据库数据出现“缺少更新条件”的错误

查询数据库的数据分配显示在页面山修改后的数据准备传递到第三章图里接收数据,然后修改到数据问题出现的环境背景及自己尝试过哪些方法相关代码// 请把代码文本粘贴到下方(请勿用图片代替代码)第一张图代码public function edit(){$db_01new DB();$id_editRequest::…

angular ajax get post 参数,Angular的Post 传递参数问题及解决方法

一、传递参数过程中POST会出问题,问题来源:我们都知道向后台传参可以使用get、post,其形式类似于nameiyy&id001 。但是在angular中却发现使用$http post 进行异步传输的过程中后台是接收不到数据的,其实这个问题就是因为请求头…

[No0000187]可能是把Java内存区域讲的最清楚的一篇文章

写在前面(常见面试题) 基本问题: 介绍下 Java 内存区域(运行时数据区)Java 对象的创建过程(五步,建议能默写出来并且要知道每一步虚拟机做了什么)对象的访问定位的两种方式&#xff…

Java语言基础及java核心

一、Java语言特点 1、 简单 2、 面向对象 3、 分布式 4、 健壮 5、 安全 6、 中性架构跨平台 7、 超强的可移植性 8、 高性能 9、 多线程 二、java的环境变量 JAVA_HOMEC:\Program Files\Java\jdk1.8.0_101 (到你的安装目录下) CLASSPASH./ &#xff0…

如何使用Quartz Scheduler和日志记录创建Web应用程序

我有时会在Quartz Scheduler论坛中为用户提供帮助。 有时,有人会问他/她如何在Web应用程序中设置Quartz。 实际上,这是一件相当简单的事情。 该库已经带有一个ServletContextListener ,您可以使用它启动调度程序。 我将在这里向您展示一个简单…

移动端日期选择插件rolldate

rolldate为上一版jquery移动端时间插件的全新版本,目前保留了上一版的大部分功能,并且增加了回调函数,以及主题风格选取,最重要的是解决了上一版本的遗留问题,依赖jquery、滑动不够流畅、参数设计不够合理等等。开发日…

ik分词器 mysql php_php环境下使用elasticSearch+ik分词器进行全文搜索

首先需要说明的一点是,如果需要启用ik分词器,那么分词器的版本必须与es版本一致,即6.3.0的分词器需要同样6.3.0版本的es支持。安装javawin-64bit的安装包需要去java英文官网查找安装ES6.3.0版本es下载地址:https://www.elastic.co…

服务器如何查看gpu型号,linux 查看服务器gpu

linux 查看服务器gpu 内容精选换一换本节操作介绍通过华为云APP连接Linux实例的操作步骤。云服务器状态为“运行中”。已获取Linux云服务器用户名和密码,忘记密码请参考在控制台重置云耀云服务器密码重置密码。云耀云服务器已经绑定弹性公网IP。所在安全组入方向已开…

Antd-Select组件的深入用法

一、Antd-Select提供几种类型 最基础版只提供下拉功能的选择器带搜索功能的下拉选择器可多选的下拉选择器可搜索、可多选、可随意输入内容的tag下拉选择器(支持自动分词)多级联动下拉选择器搜索远程数据下拉框二、一些潜在用法 如果Select.Option选项的数量特别大:2k、3k... 假…

WS-Security:使用BinarySecurityToken进行身份验证

众所周知,WS-Security设定的目标之一是对SOAP消息强制执行完整性和/或保密。 在完整性的情况下,添加到SOAP消息的签名是数学过程的结果,该过程涉及发送者的私钥,从而导致加密的消息摘要。 默认情况下,大多数框架&…

Vue(ES6)中的data属性为什么不能是一个对象?

以下引官网原文:当一个组件被定义,data 必须声明为返回一个初始数据对象的函数,因为组件可能被用来创建多个实例。如果 data 仍然是一个纯粹的对象,则所有的实例将共享引用同一个数据对象!通过提供 data 函数&#xff…

echarts vue 柱状图实例_VUE中使用Echarts绘制柱状图

在main.js中引入echartsimport echarts from ‘echarts‘Vue.prototype.$echarts echarts在相应的vue中导入echartsimport echarts from ‘echarts‘;实现柱状图显示mounted: function () {// 基于准备好的dom,初始化echarts实例let myChart echarts.init(documen…

从计算机体系结构方面思考深度学习

今年 1 月,谷歌人工智能负责人 Jeff Dean(当时还是谷歌大脑负责人)与 2017 年图灵奖得主、体系结构巨擘 David Patterson(当时获奖结果尚未公布)联合发表了题为《计算机体系结构黄金时代:赋能机器学习革命》…

使用Apollo通过WebSocket通过STOMP轻松进行消息传递

在我以前的文章中,我介绍了几个有趣的用例,这些用例使用著名的消息代理HornetQ和ActiveMQ通过Websockects实现STOMP消息传递。 但是我没有介绍的是Apollo,因为我个人认为它的API是冗长的,并且不像Java开发人员那样表现力强。 尽管…

h5渲染性能一瞥

内容来源:2018 年 6 月 30 日,饿了么前端主管向勇在“饿了么技术沙龙・第27弹 【前端专场】”进行《h5渲染性能一瞥》演讲分享。IT 大咖说(微信id:itdakashuo)作为独家视频合作方,经主办方和讲者审阅授权发…

爬虫系列之requests

爬取百度内容: 1 import requests2 url "https://www.baidu.com"3 4 if __name__ __main__:5 try:6 kv {user-agent: Mozilla/5.0}7 r requests.get(url, headerskv)8 r.raise_for_status() #返回状态值,如果…