一、单例模式
单例模式分为四种:基于文件的单例模式,基于类方法的单例模式,基于__new__的单例模式,基于metaclass的单例模式
1. 基于类方法的单例模式
- 不支持多线程模式
import threadingclass Singleton(object):def __init__(self):passimport timetime.sleep(1)@classmethoddef instance(cls,*args,**kwargs):if not hasattr(Singleton,'_instance'):Singleton._instance = Singleton(*args,**kwargs)return Singleton._instancedef task(arg):obj = Singleton.instance()print(obj) for i in range(10):t = threading.Thread(target=task,args=[i,])t.start()
- 支持多线程模式,加锁(线程锁),保证线程安全
import time import threading class Singleton(object):_instance_lock = threading.Lock() # 线程锁def __init__(self):time.sleep(1)@classmethoddef instance(cls, *args, **kwargs):if not hasattr(Singleton, "_instance"): # 当有了_instance时,则直接返回,不用加锁。with Singleton._instance_lock: # 当一个线程进来就会加锁,只有该线程执行完毕,其他线程等待if not hasattr(Singleton, "_instance"):Singleton._instance = Singleton(*args, **kwargs)return Singleton._instancedef task(arg):obj = Singleton.instance()print(obj) for i in range(10):t = threading.Thread(target=task,args=[i,])t.start() # 之后的使用不用加锁,都直接使用_instance time.sleep(20) obj = Singleton.instance() print('#',obj)
2. 基于__new__的单例模式
类单例模式需要,调用类方法,实例化时会先调用__new__
import time import threadingclass Singleton(object):_instance_lock = threading.Lock()def __init__(self):# print('init',self)passdef __new__(cls, *args, **kwargs):if not hasattr(Singleton,'_instance'):with Singleton._instance_lock:if not hasattr(Singleton,'_instance'):Singleton._instance = object.__new__(cls,*args,**kwargs)return Singleton._instanceobj1 = Singleton() obj2 = Singleton() print(obj1) print(obj2)
3. 基于metaclass方法实现的单例模式
对象是类创建,创建对象的时候类的__init__方法自动执行,对象()执行类的 __call__ 方法
类是type创建,创建类时候type的__init__方法自动执行,类() 执行type的 __call__方法(类的__new__方法,类的__init__方法)
- metaclass原理
# 继承type,模仿type的功能 class SingletonType(type):def __init__(self,*args,**kwargs):super(SingletonType,self).__init__(*args,**kwargs)def __call__(cls, *args, **kwargs):obj = cls.__new__(cls,*args,**kwargs) # 1.1创建对象cls.__init__(obj,*args,**kwargs) # 1.2初始化对象return obj# 第0步:将代码加载到内存中,编译类,执行type的__init__方法(类是type的对象) class Foo(metaclass=SingletonType): # 指定metaclassdef __init__(self,name):self.name = namedef __new__(cls, *args, **kwargs):return object.__new__(cls,*args,**kwargs)# 第一步:执行type的__call__方法(类=type的对象) # 1.1 调用Foo类(type的对象)的__new__方法,用于创建对象。 # 1.2 调用Foo类(type的对象)的__init__方法,用于对象初始化。 obj = Foo()# 第二步:执行Foo的__call__方法 obj()
- metaclass实现单例模式
import threading class SingletonType(type):_instance_lock = threading.Lock()def __call__(cls, *args, **kwargs):if not hasattr(cls,'_instance'):with SingletonType._instance_lock:if not hasattr(cls,'_instance'):cls._instance = super(SingletonType,cls).__call__(*args,**kwargs)return cls._instance# 如需单例模式加上metaclass=SingletonType即可 class Foo(metaclass=SingletonType):def __init__(self,name):self.name = nameobj1 = Foo('name') obj2 = Foo('name') print(obj1) print(obj2)
4 .单例模式数据库连接池应用
# pool.py import pymysql import threading from DBUtils.PooledDB import PooledDB, SharedDBConnectionclass SingletonDBPool(object):_instance_lock = threading.Lock()def __init__(self):# print('init',self) self.pool = PooledDB(creator=pymysql, # 使用链接数据库的模块maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制maxshared=3,# 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]ping=0,# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = alwayshost='127.0.0.1',port=3306,user='root',password='123456',database='flask_test',charset='utf8')def __new__(cls, *args, **kwargs):if not hasattr(SingletonDBPool,'_instance'):with SingletonDBPool._instance_lock:if not hasattr(SingletonDBPool,'_instance'):SingletonDBPool._instance = object.__new__(cls,*args,**kwargs)return SingletonDBPool._instancedef connect(self):return self.pool.connection()# app.py from pool import SingletonDBPooldef run():pool = SingletonDBPoolconn = pool.connect()# 数据库连接con.close() # 并没有真正断开连接if __name__ == '__main__':run()
二、session
1. 源码解析
# 1. 执行Flask类的__call__ class RequestContext(object):def __init__(self,environ):self.environ = environdef push(self):# 3# 请求相关数据,加到local中: stack.push... _request_ctx_stack.push(self)# 获取cookie中的随机字符串,检验是否有,没有就生成# 根据随机字符串,获取服务端保存的session的# {# 'xxxxxxx': {...}# 'xxxxxxx': {...}# }# 新用户: {}# 老用户:{user:'xxx'}self.session = self.app.open_session(self.request)if self.session is None:self.session = self.app.make_null_session()class Flask:def process_response(self, response):# 8# 执行 after_request装饰器for handler in funcs:response = handler(response)# 将内存中的session持久化到:数据库、....if not self.session_interface.is_null_session(ctx.session):self.save_session(ctx.session, response)return responsedef finalize_request(self, rv, from_error_handler=False):# 7response = self.make_response(rv)try:response = self.process_response(response)request_finished.send(self, response=response)except Exception:if not from_error_handler:raiseself.logger.exception('Request finalizing failed with an ''error while handling an error')return responsedef full_dispatch_request(self):# 5# 触发只执行一次的装饰器函数,@before_first_request self.try_trigger_before_first_request_functions()try:# 触发Flask的信号,没用: pip3 install blinker request_started.send(self)# 执行特殊装饰器:before_request# 如果没有返回值,rv=None;有返回值 “嘻嘻嘻”rv = self.preprocess_request()if rv is None:# 触发执行视图函数rv = self.dispatch_request()except Exception as e:rv = self.handle_user_exception(e)# 6 对返回值进行封装return self.finalize_request(rv)def wsgi_app(self, environ, start_response):# 处理request,将请求添加到local中ctx = self.request_context(environ)# 2.处理request和session ctx.push()error = Nonetry:try:# 4 执行视图函数response = self.full_dispatch_request()except Exception as e:error = eresponse = self.handle_exception(e)except:error = sys.exc_info()[1]raisereturn response(environ, start_response)finally:if self.should_ignore_error(error):error = None# 9 ctx.auto_pop(error)def __call__(self, environ, start_response):"""Shortcut for :attr:`wsgi_app`."""# 1.xxxreturn self.wsgi_app(environ, start_response)
2. 自定义session
from flask import Flask,request,session app = Flask(__name__) app.secret_key = 'sdfsdfsd' from flask.sessions import SessionInterface,SessionMixin import uuid import json from flask.sessions import SessionInterface from flask.sessions import SessionMixin from itsdangerous import Signer, BadSignature, want_bytesclass MySession(dict, SessionMixin):def __init__(self, initial=None, sid=None):self.sid = sidself.initial = initialsuper(MySession, self).__init__(initial or ())def __setitem__(self, key, value):super(MySession, self).__setitem__(key, value)def __getitem__(self, item):return super(MySession, self).__getitem__(item)def __delitem__(self, key):super(MySession, self).__delitem__(key)class MySessionInterface(SessionInterface):session_class = MySessioncontainer = {# 'asdfasdfasdfas':{'k1':'v1','k2':'v2'}# 'asdfasdfasdfas':"{'k1':'v1','k2':'v2'}" }def __init__(self):pass# import redis# self.redis = redis.Redis()def _generate_sid(self):return str(uuid.uuid4())def _get_signer(self, app):if not app.secret_key:return Nonereturn Signer(app.secret_key, salt='flask-session',key_derivation='hmac')def open_session(self, app, request):"""程序刚启动时执行,需要返回一个session对象"""sid = request.cookies.get(app.session_cookie_name)if not sid:# 生成随机字符串,并将随机字符串添加到 session对象中sid = self._generate_sid()return self.session_class(sid=sid)signer = self._get_signer(app)try:sid_as_bytes = signer.unsign(sid)sid = sid_as_bytes.decode()except BadSignature:sid = self._generate_sid()return self.session_class(sid=sid)# session保存在redis中# val = self.redis.get(sid)# session保存在内存中val = self.container.get(sid)if val is not None:try:data = json.loads(val)return self.session_class(data, sid=sid)except:return self.session_class(sid=sid)return self.session_class(sid=sid)def save_session(self, app, session, response):"""程序结束前执行,可以保存session中所有的值如:保存到resit写入到用户cookie"""domain = self.get_cookie_domain(app)path = self.get_cookie_path(app)httponly = self.get_cookie_httponly(app)secure = self.get_cookie_secure(app)expires = self.get_expiration_time(app, session)val = json.dumps(dict(session))# session保存在redis中# self.redis.setex(name=session.sid, value=val, time=app.permanent_session_lifetime)# session保存在内存中 self.container.setdefault(session.sid, val)session_id = self._get_signer(app).sign(want_bytes(session.sid))response.set_cookie(app.session_cookie_name, session_id,expires=expires, httponly=httponly,domain=domain, path=path, secure=secure)app.session_interface = MySessionInterface() # app.session_interface = Foo() # app.session_interface # app.make_null_session() @app.route('/index') def index():print('网站的所有session',MySessionInterface.container)print(session)session['k1'] = 'v1'session['k2'] = 'v2'del session['k1']# 在内存中操作字典....# session['k1'] = 'v1'# session['k2'] = 'v2'# del session['k1']return "xx"if __name__ == '__main__':app.__call__app.run()