【 一 】flask-ache
它为 Flask 应用程序提供了缓存支持。缓存是 Web 应用程序中非常常见的做法,用于存储频繁访问但不太可能经常更改的数据,以减少对数据库或其他慢速存储系统的访问,从而提高应用程序的性能和响应速度。
Flask-Caching
被配置为使用 SimpleCache
作为缓存后端。但是,SimpleCache
实际上并不适用于生产环境,因为它仅将缓存数据存储在内存中,并且不会跨多个服务器实例共享缓存数据。在生产环境中,你可能会使用更强大的缓存后端,如 Redis、Memcached 或数据库缓存。
- 设置缓存:
在index
路由处理函数中,你使用cache.set('name', 'xxx')
将键'name'
与值'xxx'
存储在缓存中。由于你设置了CACHE_DEFAULT_TIMEOUT
为 300 秒,这个缓存条目将在大约 5 分钟后过期。 - 从缓存中获取数据:
在get
路由处理函数中,你使用cache.get('name')
从缓存中获取与键'name'
关联的值。如果缓存中存在该键,则返回相应的值;否则,返回None
。
Flask-Caching
的主要优点包括:
- 性能提升:通过减少对数据库或其他慢速存储系统的访问,缓存可以显著提高应用程序的性能。
- 减少数据库负载:对于频繁访问但不太可能经常更改的数据,缓存可以显著减少数据库的负载。
- 易于配置和使用:
Flask-Caching
提供了多种缓存后端选项,并且易于配置和使用。
from flask import Flask
from flask_caching import Cache,SimpleCacheconfig = {"DEBUG": True, # some Flask specific configs"CACHE_TYPE": "SimpleCache", # Flask-Caching related configs ,可以缓存到redis"CACHE_DEFAULT_TIMEOUT": 300
}
app = Flask(__name__)
app.config.from_mapping(config)
cache = Cache(app)@app.route('/')
def index():cache.set('name', '8888')return 'index'@app.route('/get')
def get():res=cache.get('name')return resif __name__ == '__main__':app.run()
# 1 跨域 flask-cors
# 2 jwt flask-jwt
# 3 后台管理admin flask-admin
# 4 前后端分离resful flask-resful
【 二 】信号
【 1 】Flask 信号机制的基本概念
- 信号:在 Flask 中,信号是在应用程序中发生的某个事件,例如请求到达、请求处理完成或应用程序错误等。这些事件可以被视为应用程序生命周期中的关键点,开发人员可以在这些点上执行特定的操作。
- 订阅-发布模式:Flask 的信号机制采用了订阅-发布模式。开发人员可以定义特定事件的处理函数(即订阅者),并在事件发生时执行这些函数(即发布事件)。这种机制使得开发人员能够在不修改核心应用逻辑的情况下,为应用程序添加额外的功能或行为。
【 2 】Flask 信号机制的工作原理
- 定义信号:在 Flask 中,信号是通过 Blinker 库实现的。开发人员可以使用 Blinker 库或 Flask 提供的
flask.signals
模块来定义信号。信号通常被定义为一个命名空间(Namespace)下的属性,以便在应用程序中进行管理和引用。 - 发送信号:当应用程序中的某个事件发生时,开发人员可以发送一个信号来通知所有订阅了该信号的函数。发送信号需要指定信号的名称以及要传递给订阅者的任何参数。
- 接收信号:开发人员可以使用装饰器(如
@signal.connect
)来定义处理特定信号的函数。这些函数将在信号被发送时自动执行,并可以接收从发送者传递过来的参数。
【 3 】信号的使用
- Flask 提供了多个内置的信号,如请求开始、请求结束等,开发者也可以定义自己的信号。
- 开发者可以在应用程序中注册一个或多个处理函数,这些函数会在信号被触发时执行。这些处理函数可以执行任何操作,如记录日志、发送邮件、更新数据库等。
【 4 】什么是信号
# 1 Flask框架中的信号基于blinker,其主要就是让开发者可以在flask请求过程中定制一些用户行为
# 2 信号是典型的 观察者模式-触发某个事执行【模板准备渲染】-绑定信号:可以绑定多个只要模板准备渲染--》就会执行这几个绑定的新--》函数# 3 面向切面编程(AOP)--》一种方案-整个程序正常运行,但是我们可以把一部分代码,插入到某个位置执行-钩子函数:只要写了,程序走到哪,就会执行,没写,就不会执行-序列化类的校验# 4 通过信号可以做什么事?-在框架整个执行过程中,插入一些代码执行比如:记录某个页面的访问量比如:每次渲染 login.html --->都记录日志比如:程序出异常---》记录日志比如:用户表中有个用户创建--》给这个用户发点短信比如:用户下了订单---》发个邮件通知,让它尽快付款比如:轮播图表只要发生变化,就删缓存:django中内置信号
【 5 】flask中内置信号的使用
###1 flask中内置信号
request_started = _signals.signal('request-started') # 请求到来前执行
request_finished = _signals.signal('request-finished') # 请求结束后执行before_render_template = _signals.signal('before-render-template') # 模板渲染前执行
template_rendered = _signals.signal('template-rendered') # 模板渲染后执行got_request_exception = _signals.signal('got-request-exception') # 请求执行出现异常时执行request_tearing_down = _signals.signal('request-tearing-down') # 请求执行完毕后自动执行(无论成功与否)
appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否)appcontext_pushed = _signals.signal('appcontext-pushed') # 应用上下文push时执行
appcontext_popped = _signals.signal('appcontext-popped') # 应用上下文pop时执行
message_flashed = _signals.signal('message-flashed') # 调用flask在其中添加数据时,自动触发###2 绑定内置信号,当程序执行到信号位置,就执行我们的函数### 3 信号和请求扩展的关系-有的信号可以完成之前在请求扩展中完成的事-但他们机制不一样-信号更丰富
from flask import Flask, render_template, signalsapp = Flask(__name__)
app.debug = True###### 内置信号使用---》当模板渲染前[index.html]--》记录日志
# 1 写一个函数
def func1(*args, **kwargs):print('模板渲染了')print(args)print(kwargs.get('template').name)if 'index.html' == kwargs.get('template').name:print('记日志了')# from jinja2.environment import Template# 2 跟内置信号绑定
signals.before_render_template.connect(func1)# 3 等待触发(自动)@app.route('/<string:name>')
def index(name):return render_template('index.html', name=name)@app.route('/login')
def login():return render_template('login.html')if __name__ == '__main__':app.run()
【 6 】flask自定义信号
# 步骤
# 0 定义一个自定义信号
# 1 写一个函数# 2 跟内置信号绑定# 3 等待触发(手动)-->只要blog_tag 插入一条记录,就触发# pip install blinker
from dbutils.pooled_db import PooledDB
import pymysql
POOL = PooledDB(creator=pymysql, # 使用链接数据库的模块maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建maxcached=5, # 链接池中最多闲置的链接,0和None不限制maxshared=3,# 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]ping=0,# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = alwayshost='127.0.0.1',port=3306,user='root',password='123123',database='school',charset='utf8'
)
from flask import Flask, request
import pymysql
from Pool import POOLapp = Flask(__name__)
app.debug = True# 1 写一个函数,用于处理"info"的插入
def handle_info_insert(name, *args, **kwargs):print('处理info插入')if name == 'info':print('记录日志,info增加了')print(args)print(kwargs)# 3 等待触发(手动)-->只要info插入一条记录,就触发def insert_data(sql, *args):# "模拟"发送信号,直接调用处理函数handle_info_insert('info', *args)conn = POOL.connection()cursor = conn.cursor(pymysql.cursors.DictCursor)cursor.execute(sql, args) # 直接使用 args 元组作为 execute 的参数conn.commit()@app.route('/info', methods=['GET'])
def create_tag():id = request.args.get('id', type=int)name = request.args.get('name')money = request.args.get('money')sql = "INSERT INTO info (id, name, money) VALUES (%s, %s, %s)"print(sql)insert_data(sql, id,name, money)return '创建info数据成功!!!'if __name__ == '__main__':app.run()# http://127.0.0.1:5000/info?id=5&name=redis&money=588
【 7 】django中信号的使用
## 1 内置信号
Model signalspre_init # django的modal执行其构造方法前,自动触发post_init # django的modal执行其构造方法后,自动触发pre_save # django的modal对象保存前,自动触发post_save # django的modal对象保存后,自动触发pre_delete # django的modal对象删除前,自动触发post_delete # django的modal对象删除后,自动触发m2m_changed # django的modal中使用m2m字段操作第三张表(add,remove,clear)前后,自动触发class_prepared # 程序启动时,检测已注册的app中modal类,对于每一个类,自动触发
Management signalspre_migrate # 执行migrate命令前,自动触发post_migrate # 执行migrate命令后,自动触发
Request/response signalsrequest_started # 请求到来前,自动触发request_finished # 请求结束后,自动触发got_request_exception # 请求异常后,自动触发
Test signalssetting_changed # 使用test测试修改配置文件时,自动触发template_rendered # 使用test测试渲染模板时,自动触发
Database Wrappersconnection_created # 创建数据库连接时,自动触发####### 内置信号使用##############1 写一个函数2 跟内置信号绑定3 等待触发(自动的)## 1 写个函数
#放到__init__里
from django.db.models.signals import pre_save
import logging
def callBack(sender, **kwargs):# 过滤banner表 :kwargs就有表名print('对象保存了')# celery异步# 2 绑定
post_save.connect(callBack)# 3 绑定方式二,使用装饰器
from django.db.models.signals import pre_save
from django.dispatch import receiver
@receiver(pre_save)
def my_callback(sender, **kwargs):print("对象创建成功")print(sender)print(kwargs)#### 自定义信号######
# 1 定义信号(一般创建一个py文件)(toppings,size 是接受的参数)
import django.dispatch
pizza_done = django.dispatch.Signal(providing_args=["toppings", "size"])#2 写个函数注册信号
def callback(sender, **kwargs):print("callback")print(sender,kwargs)
pizza_done.connect(callback)# 3 触发信号
from 路径 import pizza_done
pizza_done.send(sender='seven',toppings=123, size=456)
【 8 】用信号的好处
# 代码侵入性低---》解耦
【 9 】 信号和信号量
# 信号:signal -flask,django中得 观察者模式 --》信号机制# 信号量:Semaphore-并发编程中概念在Python中,信号量(Semaphore)主要用来控制多个线程或进程对共享资源的访问。信号量本质上是一种计数器的锁,它维护一个许可(permit)数量,每次 acquire() 函数被调用时,如果还有剩余的许可,则减少一个,并允许执行;如果没有剩余许可,则阻塞当前线程直到其他线程释放信号量