- Flask是一款MVC框架,主要是从模型、视图、模板三个方面对Flask框架有一个全面的认识,
- 通过完成作者-读书功能,先来熟悉Flask框架的完整使用步骤。
- 操作步骤为:
- 1.创建项目
- 2.配置数据库
- 3.定义模型类
- 4.定义视图并配置URL
- 5.定义模板
- 前面说了创建项目的流程,今天主要从配置到用户,主要根据思维导图来说,先看思维导图
-
主要有以下几个方面:
1.配置文件:
设置:config.py
主文件:ihome.py
七牛云(图片上传):qinniuyun_sdk.py
短信发送:ytx_send.py
管理文件:manager.py
状态文件:status_code.py
模型设置:models.py
2.视图:user_views.py
主要根据思维导图,代码加注释的方法来说明:
1.配置文件:
设置:config.py、数据库、七牛云配置
# coding=utf-8
import hashlibimport redis
class Config:DEBUG=FalseSQLALCHEMY_DATABASE_URI='mysql://root:mysql@127.0.0.1:3306/ihome_bj14'SQLALCHEMY_TRACK_MODIFICATIONS = True
#PIL pillow#redis配置REDIS_HOST = "127.0.0.1"REDIS_PORT = 6379#session配置SECRET_KEY = "iHome"#将session存储到redis中SESSION_TYPE = "redis"SESSION_USE_SIGNER = TrueSESSION_REDIS = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT)PERMANENT_SESSION_LIFETIME = 60*60*24*14#秒# 七牛云的访问服务器QINIU_URL = 'http://ozyr1iz9u.bkt.clouddn.com/'# # 权限信息# TOKEN = hashlib.md5('ihome_bj14').hexdigest()class DevelopConfig(Config):DEBUG = True
class ProductConfig(Config):pass
主文件:ihome.py
#coding=utf-8#1.创建app对象
from manager import create_app
from config import DevelopConfig
app=create_app(DevelopConfig)#2.初始化数据库
from models import db
db.init_app(app)#3.创建管理对象
from flask_script import Manager
manager=Manager(app)#4.添加迁移命令
from flask_migrate import Migrate,MigrateCommand
Migrate(app,db)
manager.add_command('db',MigrateCommand)#5.注册蓝图
from html_views import html_blueprint
app.register_blueprint(html_blueprint)from api_v1.user_views import user_blueprint
app.register_blueprint(user_blueprint,url_prefix='/api/v1/user')from api_v1.house_views import house_blueprint
app.register_blueprint(house_blueprint,url_prefix='/api/v1/house')from api_v1.order_views import order_blueprint
app.register_blueprint(order_blueprint,url_predix='/api/v1/order')# # 钩子,是否包含token
# from flask import request,jsonify,current_app
# from status_code import *
# @app.before_request
# def check_token():
# if request.path.startswith('/api'):
# if 'token' not in request.args or request.args.get('token')!=current_app.config['TOKEN']:
# return jsonify(code=RET.REQERR)if __name__ == '__main__':manager.run()
七牛云(图片上传):qinniuyun_sdk.py
#coding=utf-8from qiniu import Auth,put_data
import logging
from flask import jsonify
from status_code import RETdef put_qiniu(f1):access_key = '6vvGPJ08BsHRjN7RRGVU3sEd38502x1TS0_i0x7s'secret_key = 'wzgWlHpflABi9DGn9TIb8IBRwqsZTwNJ--KZhosy'# 空间名称bucket_name = 'itpython'try:#构建鉴权对象q = Auth(access_key,secret_key)#生成上传Tokentoken = q.upload_token(bucket_name)#上传文件数据,ret是字典,键是hash/key,值是新文件名,info是response对象ret,info = put_data(token,None,f1.read())return ret.get('key')except:logging.ERROR(u'访问七牛云出错')return jsonify(code=RET.SERVERERR)
短信发送:ytx_send.py
# coding=utf-8
from CCPRestSDK import REST
# import ConfigParser# 主帐号
accountSid = '8a216da85fe1c856015fe83104160cc';# 主帐号Token
accountToken = '5a1dc45a953a41cebb82b5654662568';# 应用Id
appId = '8a216da85fe1c856015fe8310471033';# 请求地址,格式如下,不需要写http://
serverIP = 'app.cloopen.com';# 请求端口
serverPort = '8883';# REST版本号
softVersion = '2013-12-26';# 发送模板短信
# @param to 手机号码
# @param datas 内容数据 格式为数组 例如:{'12','34'},如不需替换请填 ''
# @param $tempId 模板Id
'''
to: 短信接收手机号码集合,用英文逗号分开,如 '13810001000,13810011001',最多一次发送200个。
datas:内容数据,需定义成数组方式,如模板中有两个参数,定义方式为array['Marry','Alon']。
templateId: 模板Id,如使用测试模板,模板id为"1",如使用自己创建的模板,则使用自己创建的短信模板id即可。'''def sendTemplateSMS(to, datas, tempId):# 初始化REST SDKrest = REST(serverIP, serverPort, softVersion)rest.setAccount(accountSid, accountToken)rest.setAppId(appId)result = rest.sendTemplateSMS(to, datas, tempId)return result.get('statusCode')
管理文件:manager.py
# coding=utf-8
from flask import Flask
from redis import StrictRedis
from werkzeug.routing import BaseConverter
import os
from flask_session import SessionBASE_DIR=os.path.dirname(os.path.abspath(__file__))class HTMLConverter(BaseConverter):regex = '.*'def create_app(config):app=Flask(__name__)app.config.from_object(config)app.url_map.converters['html']=HTMLConverter#session初始化,将session存储到redis中Session(app)# 创建redis存储对象redis_store = StrictRedis(host=config.REDIS_HOST, port=config.REDIS_PORT)app.redis = redis_store# 日志处理import loggingfrom logging.handlers import RotatingFileHandlerlogging.basicConfig(level=logging.DEBUG)file_log_handler = RotatingFileHandler(os.path.join(BASE_DIR, "logs/ihome.log"), maxBytes=1024 * 1024 * 100,backupCount=10)formatter = logging.Formatter('%(levelname)s %(filename)s:%(lineno)d %(message)s')file_log_handler.setFormatter(formatter)logging.getLogger().addHandler(file_log_handler)return app
状态文件:status_code.py
# coding=utf-8
class RET:OK = "0"DBERR = "4001"NODATA = "4002"DATAEXIST = "4003"DATAERR = "4004"SESSIONERR = "4101"LOGINERR = "4102"PARAMERR = "4103"USERERR = "4104"ROLEERR = "4105"PWDERR = "4106"REQERR = "4201"IPERR = "4202"THIRDERR = "4301"IOERR = "4302"SERVERERR = "4500"UNKOWNERR = "4501"ret_map = {RET.OK: u"成功",RET.DBERR: u"数据库查询错误",RET.NODATA: u"无数据",RET.DATAEXIST: u"数据已存在",RET.DATAERR: u"数据错误",RET.SESSIONERR: u"用户未登录",RET.LOGINERR: u"用户登录失败",RET.PARAMERR: u"参数错误",RET.USERERR: u"用户不存在或未激活",RET.ROLEERR: u"用户身份错误",RET.PWDERR: u"密码错误",RET.REQERR: u"非法请求或请求次数受限",RET.IPERR: u"IP受限",RET.THIRDERR: u"第三方系统错误",RET.IOERR: u"文件读写错误",RET.SERVERERR: u"内部错误",RET.UNKOWNERR: u"未知错误",
}
模型设置:models.py主要根据网页来寻找设置对象
# coding=utf-8
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
from werkzeug.security import generate_password_hash,check_password_hash
from flask import current_appdb=SQLAlchemy()class BaseModel(object):create_time=db.Column(db.DATETIME,default=datetime.now())update_time=db.Column(db.DATETIME,default=datetime.now(),onupdate=datetime.now())def add_update(self):db.session.add(self)db.session.commit()def delete(self):db.session.delete(self)db.session.commit()class User(BaseModel,db.Model):__tablename__='ihome_user'id=db.Column(db.INTEGER,primary_key=True)phone=db.Column(db.String(11),unique=True)pwd_hash=db.Column(db.String(200))name=db.Column(db.String(30),unique=True)avatar=db.Column(db.String(100))id_name=db.Column(db.String(30))id_card=db.Column(db.String(18),unique=True)houses=db.relationship('House',backref='user')orders=db.relationship('Order',backref='user')#读@propertydef password(self):return ''#写@password.setterdef password(self,pwd):self.pwd_hash=generate_password_hash(pwd)#对比def check_pwd(self,pwd):return check_password_hash(self.pwd_hash,pwd)def to_basic_dict(self):return {'id':self.id,'avatar':current_app.config['QINIU_URL']+self.avatar if self.avatar else '','name':self.name,'phone':self.phone}def to_auth_dict(self):return {'id_name':self.id_name,'id_card':self.id_card}ihome_house_facility = db.Table("ihome_house_facility",db.Column("house_id", db.Integer, db.ForeignKey("ihome_house.id"), primary_key=True), # 房屋编号db.Column("facility_id", db.Integer, db.ForeignKey("ihome_facility.id"), primary_key=True) # 设施编号
)class House(BaseModel, db.Model):"""房屋信息"""__tablename__ = "ihome_house"id = db.Column(db.Integer, primary_key=True) # 房屋编号# 房屋主人的用户编号user_id = db.Column(db.Integer, db.ForeignKey("ihome_user.id"), nullable=False)# 归属地的区域编号area_id = db.Column(db.Integer, db.ForeignKey("ihome_area.id"), nullable=False)title = db.Column(db.String(64), nullable=False) # 标题price = db.Column(db.Integer, default=0) # 单价,单位:分address = db.Column(db.String(512), default="") # 地址room_count = db.Column(db.Integer, default=1) # 房间数目acreage = db.Column(db.Integer, default=0) # 房屋面积unit = db.Column(db.String(32), default="") # 房屋单元, 如几室几厅capacity = db.Column(db.Integer, default=1) # 房屋容纳的人数beds = db.Column(db.String(64), default="") # 房屋床铺的配置deposit = db.Column(db.Integer, default=0) # 房屋押金min_days = db.Column(db.Integer, default=1) # 最少入住天数max_days = db.Column(db.Integer, default=0) # 最多入住天数,0表示不限制order_count = db.Column(db.Integer, default=0) # 预订完成的该房屋的订单数index_image_url = db.Column(db.String(256), default="") # 房屋主图片的路径# 房屋的设施facilities = db.relationship("Facility", secondary=ihome_house_facility)images = db.relationship("HouseImage") # 房屋的图片orders=db.relationship('Order',backref='house')def to_dict(self):return {'id':self.id,'title':self.title,'image':current_app.config['QINIU_URL']+self.index_image_url if self.index_image_url else '','area':self.area.name,'price':self.price,'create_time':self.create_time.strftime('%Y-%m-%d %H:%M:%S'),'avatar':current_app.config['QINIU_URL']+self.user.avatar if self.user.avatar else '','room':self.room_count,'order_count':self.order_count,'address':self.address}def to_full_dict(self):return {'id':self.id,'user_avatar':current_app.config['QINIU_URL']+self.user.avatar if self.user.avatar else '','user_name':self.user.name,'price':self.price,'address':self.area.name+self.address,'room_count':self.room_count,'acreage':self.acreage,'unit':self.unit,'capacity':self.capacity,'beds':self.beds,'deposit':self.deposit,'min_days':self.min_days,'max_days':self.max_days,'order_count':self.order_count,'images':[current_app.config['QINIU_URL']+image.url for image in self.images],'facilities':[facility.to_dict() for facility in self.facilities],}class HouseImage(BaseModel, db.Model):"""房屋图片"""__tablename__ = "ihome_house_image"id = db.Column(db.Integer, primary_key=True)# 房屋编号house_id = db.Column(db.Integer, db.ForeignKey("ihome_house.id"), nullable=False)url = db.Column(db.String(256), nullable=False) # 图片的路径class Facility(BaseModel, db.Model):"""设施信息"""__tablename__ = "ihome_facility"id = db.Column(db.Integer, primary_key=True) # 设施编号name = db.Column(db.String(32), nullable=False) # 设施名字css = db.Column(db.String(30), nullable=False) # 设施展示的图标def to_dict(self):return {'id':self.id,'name':self.name,'css':self.css}def to_house_dict(self):return {'id':self.id}class Area(BaseModel, db.Model):"""城区"""__tablename__ = "ihome_area"id = db.Column(db.Integer, primary_key=True) # 区域编号name = db.Column(db.String(32), nullable=False) # 区域名字houses = db.relationship("House", backref="area") # 区域的房屋def to_dict(self):return {'id':self.id,'name':self.name}class Order(BaseModel,db.Model):__tablename__ = "ihome_order"id = db.Column(db.Integer, primary_key=True)user_id = db.Column(db.Integer, db.ForeignKey("ihome_user.id"), nullable=False)house_id = db.Column(db.Integer, db.ForeignKey("ihome_house.id"), nullable=False)begin_date = db.Column(db.DateTime, nullable=False)end_date = db.Column(db.DateTime, nullable=False)days = db.Column(db.Integer, nullable=False)house_price = db.Column(db.Integer, nullable=False)amount = db.Column(db.Integer, nullable=False)status = db.Column(db.Enum("WAIT_ACCEPT", # 待接单,"WAIT_PAYMENT", # 待支付"PAID", # 已支付"WAIT_COMMENT", # 待评价"COMPLETE", # 已完成"CANCELED", # 已取消"REJECTED" # 已拒单),default="WAIT_ACCEPT", index=True)comment = db.Column(db.Text)def to_dict(self):return {'order_id':self.id,'house_title':self.house.title,'image':current_app.config['QINIU_URL']+self.house.index_image_url if self.house.index_image_url else '','create_date':self.create_time.strftime('%Y-%m-%d'),'begin_date':self.begin_date.strftime('%Y-%m-%d'),'end_date':self.end_date.strftime('%Y-%m-%d'),'amount':self.amount,'days':self.days,'status':self.status,'comment':self.comment}
模型设定好之后,执行数据库迁移2.视图:user_views.py
#coding=utf-8
import random
import reimport logging
from flask import Blueprint, jsonify
from flask import current_app
from flask import make_response
from flask import request
from flask import session
from qiniu_sdk import put_qiniufrom captcha.captcha import captcha
from models import User
from status_code import RET, ret_map
from ytx_sdk.ytx_send import sendTemplateSMS
from my_decorators import is_loginuser_blueprint=Blueprint('user',__name__)#验证码
@user_blueprint.route('/yzm')
def yzm():name,text,image=captcha.generate_captcha()session['image_yzm']=textresponse=make_response(image)response.headers['Content-Type']='image/jpeg'return response#短信发送设置
@user_blueprint.route('/send_sms')
def send_sms():#接收请求的数据dict=request.argsmobile=dict.get('mobile')imageCode=dict.get('imageCode')#验证参数是否存在if not all([mobile,imageCode]):return jsonify(code=RET.PARAMERR,msg=ret_map[RET.PARAMERR])#验证手机号格式是否正确if not re.match(r'^1[345789]\d{9}$',mobile):return jsonify(code=RET.PARAMERR,msg=u'手机号格式不正确')#验证手机号是否存在if User.query.filter_by(phone=mobile).count():return jsonify(code=RET.PARAMERR,msg=u'手机号已存在')#验证图片验证码if imageCode!=session['image_yzm']:return jsonify(code=RET.PARAMERR,msg=u'图片验证码错误')#通过云通讯函数进行短信发送sms_code=random.randint(1000,9999)session['sms_yzm']=sms_codeprint(sms_code)result='000000'# result=sendTemplateSMS(mobile,[sms_code,'5'],1)#根据云通讯返回的结果进行相应if result=='000000':return jsonify(code=RET.OK,msg=ret_map[RET.OK])else:return jsonify(code=RET.UNKOWNERR,msg=u'短信发送失败')#用户注册
@user_blueprint.route('/',methods=["POST"])
def user_register():#接收参数dict=request.formmobile=dict.get('mobile')imagecode=dict.get('imagecode')phonecode=dict.get('phonecode')password=dict.get('password')password2=dict.get('password2')#验证参数是否存在if not all([mobile,imagecode,phonecode,password,password2]):return jsonify(code=RET.PARAMERR,msg=ret_map[RET.PARAMERR])#验证图片验证码if imagecode!=session['image_yzm']:return jsonify(code=RET.PARAMERR,msg=u'图片验证码错误')#验证短信验证码if int(phonecode)!=session['sms_yzm']:return jsonify(code=RET.PARAMERR,msg=u'短信验证码错误')#验证手机号格式是否正确if not re.match(r'^1[345789]\d{9}$',mobile):return jsonify(code=RET.PARAMERR,msg=u'手机格式错误')#验证手机号是否存在if User.query.filter_by(phone=mobile).count():return jsonify(code=RET.PARAMERR,msg=u'手机号码存在')#保存用户对象user=User()user.phone=mobileuser.name=mobileuser.password=passwordtry:user.add_update()return jsonify(code=RET.OK,msg=ret_map[RET.OK])except:logging.ERROR(u'用户注册更新数据库失败,手机号:%s,密码:%s'%(mobile,password))return jsonify(code=RET.DBERR,msg=ret_map[RET.DBERR])@is_login
@user_blueprint.route('/',methods=['GET'])
def user_my():#获取当前登录的用户user_id=session['user_id']#查询当前用户的头像/用户名/手机号/并返回user=User.query.get(user_id)return jsonify(user=user.to_basic_dict())@is_login
#个人信息获取用户名
@user_blueprint.route('/auth',methods=['GET'])
def user_auth():#获取当前登录用户的编号user_id=session['user_id']#根据编号查询当前用户user=User.query.get(user_id)#返回用户的真实姓名,身份证号return jsonify(user.to_auth_dict())@is_login
#上传头像
@user_blueprint.route('/',methods=['PUT'])
def user_profile():dict=request.formif 'avatar1' in dict:try:#获取头像文件f1=request.files['avatar']# print(f1)# print(type(f1))# from werkzeug.datastructures import FileStorage#mime-type:国际规范,表示文件的类型,如text/html,text/xml,image/png,image/jpeg..if not re.match('image/.*',f1.mimetype):return jsonify(code=RET.PARAMERR)except:return jsonify(code=RET.PARAMERR)# 上传到七牛云# access_key = 'H999S3riCJGPiJOity1GsyWufw3IyoMB6goojo5e'# secret_key = 'uOZfRdFtljIw7b8jr6iTG-cC6wY_-N19466PXUAb'# # 空间名称# bucket_name = 'itcast20171104'url=put_qiniu(f1)#如果未出错#保存用户的头像信息try:user=User.query.get(session['user_id'])user.avatar=urluser.add_update()except:logging.ERROR(u'数据库访问失败')return jsonify(code=RET.DBERR)# 则返回图片信息return jsonify(code=RET.OK,url=current_app.config['QINIU_URL'] + url)elif 'name' in dict:#修改用户名name=dict.get('name')#判断用户名是否存在if User.query.filter_by(name=name).count():return jsonify(code=RET.DATAEXIST)else:user=User.query.get(session['user_id'])user.name=nameuser.add_update()return jsonify(code=RET.OK)else:return jsonify(code=RET.PARAMERR,msg=ret_map[RET.PARAMERR])@is_login
#实名认证
@user_blueprint.route('/auth',methods=['PUT'])
def user_auth_set():#接受参数dict=request.formid_name=dict.get('id_name')id_card=dict.get('id_card')#验证参数合法性if not all([id_name,id_card]):return jsonify(code=RET.PARAMERR,msg=ret_map[RET.SESSIONERR])#验证身份合法性if not re.match(r'^[1-9]\d{5}(19|20)\d{2}((0[1-9])|(10|11|12))(([0-2][1-9])|10|20|30|31)\d{3}[0-9Xx]$', id_card):return jsonify(code=RET.PARAMERR, msg=ret_map[RET.PARAMERR])#判断身份证号是否存在#修改数据对象try:user=User.query.get(session['user_id'])except:logging.ERROR(u'查询用户失败')return jsonify(code=RET.DBERR)try:user.id_card=id_carduser.id_name=id_nameuser.add_update()except:logging.ERROR(u'修改用户姓名/身份证号失败')return jsonify(code=RET.DBERR)#返回数据return jsonify(code=RET.OK)#用户注册
@user_blueprint.route('/session',methods=['POST'])
def user_login():#接收参数dict=request.formmobile=dict.get('mobile')password=dict.get('password')#验证非空if not all([mobile,password]):return jsonify(code=RET.PARAMERR,msg=ret_map[RET.PARAMERR])#验证手机号格式是否正确if not re.match(r'^1[345789]\d{9}$',mobile):return jsonify(code=RET.PARAMERR,msg=u'手机号格式错误')#数据处理try:user=User.query.filter_by(phone=mobile).first()except:logging.ERROR('用户登录--数据库出错')return jsonify(code=RET.DBERR,msg=ret_map[RET.DBERR])#判断手机号是否存在if user:#判断密码是否正确if user.check_pwd(password):session['user_id']=user.idreturn jsonify(code=RET.OK,msg=u'ok')else:return jsonify(code=RET.PARAMERR,msg=u'密码不正确')else:return jsonify(code=RET.PARAMERR,mag=u'手机号不存在')#用户登录
@user_blueprint.route('/session',methods=['GET'])
def user_is_login():if 'user_id' in session:user=User.query.filter_by(id=session['user_id']).first()return jsonify(code=RET.OK,name=user.name)else:return jsonify(code=RET.DATAERR)#用户退出
@user_blueprint.route('/session',methods=['DELETE'])
def user_logout():session.clear()return jsonify(code=RET.OK)