from sqlalchemy import create_engine# 替换为你的MySQL数据库信息 username = 'root' password = '123456' host = 'localhost' # 例如:'localhost' 或 '127.0.0.1' port = '3306' # 通常是 3306 database = 'ee'# 创建连接引擎 engine = create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{database}') # engine = create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{database}', echo=True)
以上文本放置在config.py文件,设置MySQL信息
from wtforms import Form, BooleanField, StringField, PasswordField, validators from sqlalchemy import Column, String, Integer from sqlalchemy.orm import declarative_base from config import engineBase = declarative_base()class User(Base):__tablename__ = "users"id = Column('id', Integer, primary_key=True, )name = Column('name', String(50), unique=True, comment="姓名")age = Column('age', Integer, unique=True, comment='年龄')# 这里是自定义的调用该数据后的自动回复形式设置# def __repr__(self):# repr_id = self.id# repr_name = self.name# repr_age = self.age# return f"id:{repr_id}, name: {repr_name}, age: {repr_age}"if __name__ == '__main__':Base.metadata.create_all(engine)
上文放置在models文件夹下的userdate.py,用于创建数据模型
from flask import Blueprint, request from models.userdata import User import json from config import engine from sqlalchemy.orm import sessionmakerSession = sessionmaker(bind=engine) # 构建session对象 session = Session() user = Blueprint('user', __name__)
def user_data_zip(query_result):list1 = []for result in query_result:list_key = ['id', 'name', 'age']list_value = [result.id, result.name, result.age]zip1 = dict(zip(list_key, list_value))list1.append(zip1)return list1@user.route('/GetList/', methods=['get']) def GetList_list():query_result = session.query(User).all()list1 = user_data_zip(query_result)return json.dumps(list1)@user.route('/GetList/<id_num>', methods=['get']) def GetList_list_id(id_num):if id_num == 'all':query_result = session.query(User).all()else:query_result = session.query(User).filter(User.id == id_num).all()list1 = user_data_zip(query_result)return json.dumps(list1)@user.route('/PostList', methods=['post']) def PostList_list():# request.get_json用于json数据请求体中获取# request.form用于html中表单数据获取list_form = request.get_json()post_data = User(id=list_form['id'], name=list_form['name'], age=list_form['age'])session.add(post_data)session.commit()return list_form@user.route('/PutUser', methods=['post']) def PutUser_list():list_form = request.get_json()post_data = session.query(User).filter(User.id == list_form['id'])# 放入的值必须是不同的post_data.update({"name": list_form['name'], "age": list_form['age']})session.commit()return list_form@user.route('/DeleteUser', methods=['post']) def DeleteUser_list():list_form = request.get_json()post_data = session.query(User).filter(User.id == list_form['id'])post_data.delete()session.commit()return list_form
以上文本放置在user.py,用于获取JSON数据并对数据库进行增删改查
from flask import Flask from views.error import error # from gevent import pywsgi from views.user import user from models.userdata import User test = Flask(__name__)
# 注册蓝图 test.register_blueprint(user, url_prefix='/user')if __name__ == '__main__':test.run(debug=True)# server = pywsgi.WSGIServer(('', 80), test)# print('WSGI服务开启:127.0.0.1:80')# server.serve_forever()
以上文本放置在test.py,以此为启动文件,代替app.py默认,只需要把test改成app即可换回默认形式