信号
内置信号的使用
# 第一步:写一个函数
def test(app, **kwargs):print(app)print(type(kwargs))# 请求地址是根路径,才记录日志,其它都不记录print(kwargs['context']['request'].path)if kwargs['context']['request'].path == '/':print('记录日志了')# 第二步:跟内置信号绑定
# signals 中有很多内置信号
signals.before_render_template.connect(test)# 第三步:等待信号被触发(不需要咱们做)--->只要执行到内置信号位置,绑定的函数就会执行
有哪些内置信号
request_started = _signals.signal('request-started') # 请求到来前执行
request_finished = _signals.signal('request-finished') # 请求结束后执行before_render_template = _signals.signal('before-render-template') # 模板渲染前执行
template_rendered = _signals.signal('template-rendered') # 模板渲染后执行got_request_exception = _signals.signal('got-request-exception') # 请求执行出现异常时执行request_tearing_down = _signals.signal('request-tearing-down') # 请求执行完毕后自动执行(无论成功与否)
appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否)appcontext_pushed = _signals.signal('appcontext-pushed') # 应用上下文push时执行
appcontext_popped = _signals.signal('appcontext-popped') # 应用上下文pop时执行
message_flashed = _signals.signal('message-flashed') # 调用flask在其中添加数据时,自动触发
自定义信号
# 1 第一步:定义一个自定义 信号
# 2 第二步:写个函数
# 3 第三步:函数跟自己定义信号绑定
# 4 第四步:触发自定义信号---》我们做
from flask import Flask, session, render_template, signals
from flask.signals import _signals# pip3 install blinker
app = Flask(__name__)
app.debug = True
app.secret_key = 'asdfasdfasdf'#### 自定义信号---》session每次放一个值,我们就执行信号
# 1 定义信号
# 自定义信号
session_input = _signals.signal('session_input')# 2 写个函数
def test2(*args, **kwargs):print(args) # appprint(kwargs) # {session kk}print('session放值了')if kwargs.get('kk').get('name') == 'xx':print('记录日志')# 3 绑定信号
session_input.connect(test2)# 4 触发信号
@app.route('/')
def index():session['uu'] = '00'session_input.send(app, session=session, kk={'name': 'uu'})return render_template('index.html')@app.route('/home')
def home():return render_template('home.html')@app.route('/order')
def order():session['xx'] = 'xx'session_input.send(app, session=session, kk={'name': 'xx'})return "order"if __name__ == '__main__':app.run(port=8080)
信号的作用(信号量--》Semaphore)
# 对代码进行解耦 # 1 记录日志:只要是张三用户,访问index页面,就记录日志
# 2 只要用户表中,插入一条记录,就给用户发个短信通知User.object.create--->调用发短信方法--》找到10个地址--》改10个地方-如果有个内置信号---》只要表中增加记录,就会触发这个信号----》通过信号内判断这个表是不是User表,决定要不要发短信-flask中没有这个内置信号---》自定义
信号量
# acquire():消耗信号量
# release():释放信号量
import threading
import time
def run(n):semaphore.acquire() # 计数器获取锁time.sleep(5) # 程序休眠5秒print (n)semaphore.release() # 计数器释放锁if __name__ == '__main__':# 添加一个计数器,最大并发线程数量5(最多同时运行5个线程)semaphore = threading.Semaphore(5)for i in range(50):t = threading.Thread(target=run, args=(i, )) # 创建线程t.start()
django的信号
# 内置信号:#Model signalspre_init # django的modal执行其构造方法前,自动触发post_init # django的modal执行其构造方法后,自动触发pre_save # django的modal对象保存前,自动触发post_save # django的modal对象保存后,自动触发pre_delete # django的modal对象删除前,自动触发post_delete # django的modal对象删除后,自动触发m2m_changed # django的modal中使用m2m字段操作第三张表(add,remove,clear)前后,自动触发class_prepared # 程序启动时,检测已注册的app中modal类,对于每一个类,自动触发
Management signalspre_migrate # 执行migrate命令前,自动触发post_migrate # 执行migrate命令后,自动触发
Request/response signalsrequest_started # 请求到来前,自动触发request_finished # 请求结束后,自动触发got_request_exception # 请求异常后,自动触发
Test signalssetting_changed # 使用test测试修改配置文件时,自动触发template_rendered # 使用test测试渲染模板时,自动触发
Database Wrappersconnection_created # 创建数据库连接时,自动触发# 内置信号使用(当user表创建用户,就给用户发个邮件)1 写个函数 #放到__init__里from django.db.models.signals import pre_saveimport loggingdef callBack(sender, **kwargs):logging.debug('%s创建了一个%s对象'%(sender._meta.model_name,kwargs.get('instance').title))2 绑定内置信号 pre_save.connect(callBack)3 等待触发# 内置信号from django.db.models.signals import pre_savefrom django.dispatch import receiver@receiver(pre_save)def my_callback(sender, **kwargs):print("对象创建成功")print(sender)print(kwargs)# 自定义信号:#1 定义信号(一般创建一个py文件)(toppings,size 是接受的参数)import django.dispatchpizza_done = django.dispatch.Signal(providing_args=["toppings", "size"])# 2 注册信号def callback(sender, **kwargs):print("callback")print(sender,kwargs)pizza_done.connect(callback)
# 3 触发信号from 路径 import pizza_donepizza_done.send(sender='seven',toppings=123, size=456)# 自定义信号干过什么?-做双写一致性的缓存更新
flask-script
# django中,有命令 python manage.py runserverpython manage.py makemigrations...自定制命令(django如何自定制命令)...-python manage.py init_db excel文件路径 指定表名# flask启动项目,像djagno一样,通过命令启动Flask==2.2.2
Flask_Script==2.0.3#借助于:flask-script 实现-安装:pip3.8 install flask-script-修改代码:from flask_script import Managermanager=Manager(app)manager.run()-用命令启动python38 manage.py runserver# 自定制命令#1 简单自定制命令@manager.commanddef custom(arg):# 命令的代码,比如:初始化数据库, 有个excel表格,使用命令导入到mysql中print(arg)#2 复杂一些的自定制命令@manager.option('-n', '--name', dest='name')@manager.option('-u', '--url', dest='url')def cmd(name, url):# python run.py cmd -n lqz -u xxx# python run.py cmd --name lqz --url uuuprint(name, url)# django 中如何自定制命令
sqlalchemy介绍和快速使用
# orm 框架----》django orm--》只能用在django中,不能独立使用
# python界的orm框架-peewee-sqlalchemy:企业级-djagno rom-Tortoise ORM-GINO# go 界orm框架-gorm 国人写的-Xorm# java界orm框架-ssh 框架springmvc structs Hibernate(java的orm框架)-ssh spring springmvc Hibernate-ssm Spring SpringMVC MyBatis (orm框架)-springboot :sb框架 ---》java工程师就是spring工程师-spring cloud# 分层:
Engine,框架的引擎
Connection Pooling ,数据库连接池
Dialect,选择连接数据库的DB API种类(sqlite,mysql...)
Schema/Types,架构和类型
SQL Exprression Language,SQL表达式语言#操作不同数据库
MySQL-Pythonmysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>pymysqlmysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]MySQL-Connectormysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>cx_Oracleoracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html# 了解orm不能创建数据库---》只能创建表,删除表---》sqlalchemy不能增加删除字段--》借助于第三方插件实现
sqlalchemy的原生操作
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine# 第一步:创建engine对象
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/cars?charset=utf8",max_overflow=0, # 超过连接池大小外最多创建的连接pool_size=5, # 连接池大小pool_timeout=30, # 池中没有线程最多等待的时间,否则报错pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)# 第二步:通过engine获得链接
conn=engine.raw_connection()
cursor = conn.cursor()
cursor.execute("select * from news"
)
result = cursor.fetchall()
print(result)
cursor.close()
conn.close()