ORM框架SQLAlchemy
目录
- ORM框架SQLAlchemy
- 介绍
- 安装
- 架构
- 连接数据库
- 1. PostgreSQL
- 2. MySQL
- 3. Oracle
- 4. Microsoft SQL Server
- 5. SQLite
- 创建连接池
- 原生Python操作数据库
- SQLAlchemy的ORM操作
- 创建表
- 外键字段的创建
- 一对多
- 多对多
- ORM操作增删改查
- 查询
- 添加
- 删除
- 修改
介绍
sqlalchemy是Python的SQL工具和ORM框架,可以用Python代码直接操作关系型数据库(例如:MySQL、PostgreSQL、Oracle)
官方文档
安装
pip install sqlalchemy
架构
- Engine:框架的引擎
- Connection Pooling :数据库连接池
- Dialect:选择连接数据库的DB API种类
- SQL Exprression Language:SQL表达式语言
连接数据库
from sqlalchemy import create_engine
1. PostgreSQL
# 使用默认驱动
postgres_default_engine = create_engine("postgresql://scott:tiger@localhost/mydatabase")
# 使用 psycopg2 驱动
postgres_psycopg2_engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/mydatabase")
# 使用 pg8000 驱动
postgres_pg8000_engine = create_engine("postgresql+pg8000://scott:tiger@localhost/mydatabase")
2. MySQL
# 使用默认驱动
mysql_default_engine = create_engine("mysql://scott:tiger@localhost/foo")
# 使用 mysqlclient(MySQL-Python 的维护分支)
mysql_mysqlclient_engine = create_engine("mysql+mysqldb://scott:tiger@localhost/foo")
# 使用 PyMySQL
mysql_pymysql_engine = create_engine("mysql+pymysql://scott:tiger@localhost/foo")
3. Oracle
# 使用默认驱动(不推荐,建议使用 cx_oracle)
oracle_default_engine = create_engine("oracle://scott:tiger@127.0.0.1:1521/sidname")
# 使用 cx_oracle 驱动
oracle_cx_oracle_engine = create_engine("oracle+cx_oracle://scott:tiger@tnsname")
4. Microsoft SQL Server
# 使用 pyodbc 驱动
mssql_pyodbc_engine = create_engine("mssql+pyodbc://scott:tiger@mydsn")
# 使用 pymssql 驱动
mssql_pymssql_engine = create_engine("mssql+pymssql://scott:tiger@hostname:port/dbname")
5. SQLite
# Unix/Mac - 使用绝对路径
sqlite_unix_engine = create_engine("sqlite:absolute/path/to/foo.db")
# Windows - 使用绝对路径
sqlite_windows_engine = create_engine("sqlite:///C:\\path\\to\\foo.db")
# Windows - 使用原始字符串和绝对路径
sqlite_windows_raw_engine = create_engine(r"sqlite:///C:\path\to\foo.db")
创建连接池
这里使用pymysql进行连接
from sqlalchemy import create_engine# 连接到localhost的flaskdemo库中,用户root密码1234
engine = create_engine("mysql+pymysql://root:1234@localhost/flaskdemo", max_overflow=0, # 超过连接池大小外最多创建的连接pool_size=5, # 连接池大小pool_timeout=30, # 池中没有线程最多等待的时间,否则报错pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置))
原生Python操作数据库
from sqlalchemy import create_engineengine = create_engine("mysql+pymysql://root:1234@localhost/flaskdemo", max_overflow=0, pool_size=5, pool_timeout=30,pool_recycle=-1)def task(engine):conn = engine.raw_connection()cursor = conn.cursor()cursor.execute(# 查询users表中所有数据"select * from users")result = cursor.fetchall()print(result)cursor.close()conn.close()task(engine)
SQLAlchemy的ORM操作
创建表
import datetimefrom sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.orm import DeclarativeBaseclass Base(DeclarativeBase):passclass User(Base):__tablename__ = 'user' # 表名id = Column(Integer, primary_key=True, autoincrement=True) # 自增主键字段idname = Column(String(32), nullable=False, index=True) # 最大长度32不可为空 普通索引类型的字段nameage = Column(Integer)create_time = Column(DateTime, default=datetime.datetime.now)if __name__ == '__main__':engine = create_engine("mysql+pymysql://root:7997@localhost/flaskdemo",max_overflow=0, # 超过连接池大小外最多创建的连接pool_size=5, # 连接池大小pool_timeout=30, # 池中没有线程最多等待的时间,否则报错pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置))Base.metadata.create_all(engine) # 创建Base下的所有表# Base.metadata.drop_all(engine) # 删除表
外键字段的创建
一对多
创建用户表和兴趣表,用户表的hobby_id和兴趣表是一对多关系
import datetimefrom sqlalchemy import create_engine, Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.orm import DeclarativeBase, relationshipclass Base(DeclarativeBase):passclass User(Base):__tablename__ = 'user'id = Column(Integer, primary_key=True, autoincrement=True)name = Column(String(32), nullable=False, index=True)# 这里的hobby是表名而不是类名hobby_id = Column(Integer, ForeignKey('hobby.id'))# 用于正向查询:user.hobby,绑定级联删除hobby = relationship('Hobby', back_populates='user', cascade='all,delete')class Hobby(Base):__tablename__ = 'hobby'id = Column(Integer, primary_key=True, autoincrement=True)name = Column(String(32), nullable=False, index=True)# 用于反向查询:hobby.userusers = relationship('User', back_populates='hobby', foreign_keys=[User.hobby_id])if __name__ == '__main__':engine = create_engine("mysql+pymysql://root:7997@localhost/flaskdemo",max_overflow=0, # 超过连接池大小外最多创建的连接pool_size=5, # 连接池大小pool_timeout=30, # 池中没有线程最多等待的时间,否则报错pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置))Base.metadata.create_all(engine)
多对多
import datetimefrom sqlalchemy import create_engine, Column, Integer, String, ForeignKey, Table
from sqlalchemy.orm import DeclarativeBase, relationship, Sessionclass Base(DeclarativeBase):passuser_course = Table('user_course', Base.metadata,Column('user_id', Integer, ForeignKey('user.id')),Column('course_id', Integer, ForeignKey('course.id')))class User(Base):__tablename__ = 'user'id = Column(Integer, primary_key=True, autoincrement=True)name = Column(String(32), nullable=False, index=True)# 这里的hobby是表名而不是类名hobby_id = Column(Integer, ForeignKey('hobby.id'))# 正向查询,关联一对多hobby,多对多coursehobby = relationship('Hobby', back_populates='user', cascade='all,delete')course = relationship('Course', back_populates='user', secondary=user_course)class Hobby(Base):__tablename__ = 'hobby'id = Column(Integer, primary_key=True, autoincrement=True)name = Column(String(32), nullable=False, index=True)# 用于反向查询:hobby.useruser = relationship('User', back_populates='hobby', foreign_keys=[User.hobby_id])class Course(Base):__tablename__ = 'course'id = Column(Integer, primary_key=True, autoincrement=True)name = Column(String(32), nullable=False, index=True)# 用于反向查询:course.useruser = relationship('User', back_populates='course', secondary=user_course)if __name__ == '__main__':engine = create_engine("mysql+pymysql://root:7997@localhost/flaskdemo",max_overflow=0, # 超过连接池大小外最多创建的连接pool_size=5, # 连接池大小pool_timeout=30, # 池中没有线程最多等待的时间,否则报错pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置))Base.metadata.create_all(engine) # 创建表
ORM操作增删改查
假设目前这三张表中有这些数据
查询
def find():session = Session(engine)res = session.query(User).filter_by(name='陈五').first()print(res.name)# 获取res多对多关联的所有course字段for i in res.course:print(i.name)res_all = session.query(User).all()for i in res_all:print(i.name)
添加
def add():session = Session(engine)user = User(name='陈五', hobby_id=2)# 多对多字段 单个数据添加user.course.append(Course(name='语文'))user.course.append(Course(name='数学'))# 多对多字段 多个数据添加courses = session.query(Course).where(Course.id > 1).all() # 获取所有id>1的course对象user1 = session.query(User).filter_by(name='彭于晏').first() # 获取彭于晏对象user1.course = coursessession.add(user)session.commit()
删除
def delete():session = Session(engine)# 无外键字段删除session.query(User).filter_by(name='张三').delete() # 需要关联关系上绑定级联删除# 多对多删除user = session.query(User).filter_by(name='彭于晏').first()course = session.query(Course).filter_by(name='英语').first()# 删除彭于晏和英语的外键绑定session.query(user_course).filter_by(user_id=user.id, course_id=course.id).delete() session.commit()session.commit()
修改
def change():session = Session(engine)# 无外键字段修改session.query(User).filter_by(id=2).update({'name': '彭于晏'})session.commit()
session.query(user_course).filter_by(user_id=user.id, course_id=course.id).delete()
session.commit()
session.commit()
#### 修改```python
def change():session = Session(engine)# 无外键字段修改session.query(User).filter_by(id=2).update({'name': '彭于晏'})session.commit()