Python Sqlalchemy基础使用

Python Sqlalchemy基础使用

  • Python Sqlalchemy基础使用
    • 基本使用
      • 创建Session
      • 创建ORM对象
      • 查询
      • 插入
    • 进阶操作
      • 插入存在时更新
      • 执行SQL

Python Sqlalchemy基础使用

这里记录一下,在编写python代码过程中使用Sqlalchemy的封装和基本使用方法。

(持续完善ing)

基本使用

创建Session

这里采用读取yaml配置文件+调用函数的方式进行创建:

from util.database.sql_execute import create_session
from util.file.config_read import CONFIGdef get_db_config():"""获取数据库配置Returns: 数据库配置(dict)"""return CONFIG['database']['business-5']# 创建数据库会话
DBSession = create_session(get_db_config())session = DBSession()

yaml配置:

# 数据库配置
database:business-245:dbms: 'mysql'driver: 'pymysql'ip: '10.135.149.245'port: '3306'username: 'xxxx'password: 'xxxx'database: 'xxxx'business-119:dbms: 'mssql'driver: 'pymssql'ip: '10.135.30.119'port: '1433'username: 'xxxx'password: 'xxxx'database: 'xxxx'business-5:dbms: 'mssql'driver: 'pymssql'ip: '10.135.149.5'port: '1433'username: 'xxx'password: 'xxxx'database: 'xxx'

封装:

from urllib.parse import quote_plus as urlquotefrom sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.dialects.mysql import insert# 创建对象的基类:
Base = declarative_base()def create_session(params: dict):"""创建数据库会话Examples:params = {'dbms': 'mysql','driver': 'pymysql','ip': '192.168.30.193','port': '3306','username': 'root','password': 'xxxx','database': 'disaster_census'}Args:params: 请求参数说明 (dbms: 数据库类型, driver: 数据库驱动, username: 数据库用户名, password: 数据库密码, ip: IP地址, port: 端口号, database: 数据库)Returns: 数据库会话"""# 构建url# engine = create_engine("数据库类型+数据库驱动://数据库用户名:数据库密码@IP地址:端口号/数据库?编码...", 其它参数)url = f"{params['dbms']}+{params['driver']}://{params['username']}:{urlquote(params['password'])}@{params['ip']}:{params['port']}/{params['database']}"# 创建DBSession类型:return create_session_url(url)def create_session_url(url: str):"""创建数据库会话Args:url: "数据库类型+数据库驱动://数据库用户名:数据库密码@IP地址:端口号/数据库?编码...", 例:mysql+pymysql://root:password@localhost:3306/testReturns:数据库会话"""# 初始化数据库连接:# engine = create_engine("数据库类型+数据库驱动://# 数据库用户名:数据库密码@# IP地址:端口号/数据库?编码...", 其它参数)engine = create_engine(url)# 创建DBSession类型:db_session = sessionmaker(bind=engine)return db_session
import yaml
import osfrom util.file.path import get_root_pathdef get_config_path():"""获取项目配置文件路径, 为项目根路径下的config目录Returns: 项目配置文件路径"""return get_root_path() + '/config'def get_profile():"""读取profile配置,例: prod、dev等。(从项目根目录下./config/config.yaml文件中, 获取'profile: dev' 中的配置信息)Returns: profile信息,不存在为''"""profile = ''config_path = get_config_path()filename = 'config.yaml'profile_path = f'{config_path}/{filename}'if os.path.exists(profile_path):print(f"读取配置文件 '{filename}' 成功!")with open(profile_path, 'r') as f:yaml_config = yaml.load(f.read(), Loader=yaml.FullLoader)profile = yaml_config['profile']else:print(f"读取profile配置文件失败, '{filename}' 不存在!")return profiledef get_config_yaml():"""获取yaml配置文件 (约定目录为项目根目录,文件名为 ['config-{profile}.yml', 'config-{profile}.yaml', ])Returns: dict对象"""# 1.获取项目路径(以当前文件为参考)和配置文件名称config_path = get_config_path()profile = get_profile()filenames = [f'config-{profile}.yml', f'config-{profile}.yaml', ]config = {}print(f'开始读取配置文件,path = {config_path}')# 2.依次读取yaml配置文件(相同key时,后面覆盖前面)for i in range(len(filenames)):# 2.1.获取配置文件相对路径filename = filenames[i]path = config_path + '/' + filename# 读取配置文件,并打印日志if os.path.exists(path):print(f"读取配置文件 '{filename}' 成功!")with open(path, 'r') as f:yaml_config = yaml.load(f.read(), Loader=yaml.FullLoader)config.update(yaml_config)  # 更新会导致第一层直接覆盖,后续需要进行深拷贝else:print(f"读取配置文件失败, '{filename}' 不存在!")return configCONFIG = get_config_yaml()

创建ORM对象

import datetimefrom sqlalchemy import Column, String, Integer, DateTime, DECIMAL, BigInteger, and_
from util.database.sql_execute import Base# 定义 TyphoonInfo(台风轨迹)对象:
class TyphoonInfo(Base):# 表的名字:__tablename__ = 'typhoon_info'# 表的结构:id = Column(BigInteger)sort = Column(Integer)code_global = Column(Integer)track_count = Column(Integer)cyclone_no = Column(Integer)code_cn = Column(Integer)cyclone_end = Column(Integer)typhoon_name_en = Column(String(255))typhoon_name_cn = Column(String(255))observe_time = Column(DateTime)start_time = Column(DateTime)end_time = Column(DateTime)zj_across = Column(Integer, default=0)create_time = Column(DateTime, default=datetime.datetime.now())create_by = Column(String(32), default='-1')modify_time = Column(DateTime)modify_by = Column(String(32))__mapper_args__ = {"primary_key": [id]}def __repr__(self):attributes = ', '.join(f"{k}={v!r}" for k, v in vars(self).items())return f"{self.__class__.__name__}({attributes})"

查询

创建好session后,在需要的地方导入并执行:

from src.config.db_business_5 import session as session5def select_typhoon_info_year(year):start_time = datetime.datetime(year, 1, 1, 0, 0)end_time = datetime.datetime(year, 12, 31, 23, 59)typhoon_list = session5.query(TyphoonInfo) \.filter(or_(and_(TyphoonInfo.start_time >= start_time, TyphoonInfo.start_time <= end_time, TyphoonInfo.zj_across == 1),and_(TyphoonInfo.end_time >= start_time, TyphoonInfo.end_time <= end_time, TyphoonInfo.zj_across == 1)))return typhoon_list

插入

普通插入(也可以使用后面进阶操作的存在时更新):

def insert_db(track_list: list):session.add_all(tracks)session.commit()return

进阶操作

插入存在时更新

封装:

def upsert(session, table_cls, records: list, chunk_size=5000, commit_on_chunk=True, except_cols_on_update=[]):"""Examples: upsert(session, SurfStationTyphoonHistory, insert_list, except_cols_on_update=["typhoon_id", "station_id"])Args:session:table_cls: Bean class对象records: 入库列表chunk_size: 批量大小commit_on_chunk:except_cols_on_update: uk字段数组,如:[]Returns:"""records_tmp = []# 收集字段名columns = []for key in vars(records[0]):if not key.startswith("_"):columns.append(key)# 转字典类型for r in records:obj = {}for c in columns:obj[c] = getattr(r, c)records_tmp.append(obj)update_keys = [key for key in records_tmp[0].keys() if(key not in except_cols_on_update) and not key.startswith("_")]for i in range(0, len(records_tmp), chunk_size):chunk = records_tmp[i:i + chunk_size]insert_stmt = insert(table_cls).values(chunk)update_columns = {x.name: x for x in insert_stmt.inserted if x.name in update_keys}upsert_stmt = insert_stmt.on_duplicate_key_update(**update_columns)session.execute(upsert_stmt)if commit_on_chunk:session.commit()

插入:

from src.config.db_business_245 import session as session245
from util.database.sql_execute import upsertdef insert_stats_surf_station_typhoon_history(insert_list: list):upsert(session245, SurfStationTyphoonHistory, insert_list, except_cols_on_update=["typhoon_id", "station_id"])return

执行SQL

if __name__ == '__main__':year = 2024start_time = '2023-01-01 00:00:00'end_time = '2023-01-01 23:59:00'result_list = session5.execute(text(u"select 区站号,日期时间,一小时雨量,最大风速,极大风速,最大风速对应时间,极大风速对应时间 from zdz.CAWSData_2023 where 日期时间 between :start_time and :end_time"), params={'start_time': start_time, 'end_time': end_time})for row in result_list:station_ic_c = row[0]observe_time = row[1]pre_1h = row[2]win_max = row[3]win_max_inst = row[4]win_max_ot = row[5]win_max_inst_ow = row[6]# print(row)print()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/34310.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

代码随想录算法训练营Day48|188.买卖股票的最佳时机IV、309.最佳买卖股票时间含冷冻期、714.买卖股票的最佳时机含手续费

买卖股票的最佳时机IV . - 力扣&#xff08;LeetCode&#xff09; 至多可以购买k次&#xff0c;相比买卖股票的最佳时机III至多购买2次&#xff0c;区别在于次数不确定。 每买卖一次&#xff0c;dp数组的第二维度加2&#xff0c;dp数组的第二维度为2k1&#xff08;0-2k&…

TDengine 3.2.3.0 集成英特尔 AVX512!快来看看为你增添了哪些助力

在当今的 IoT 和智能制造领域&#xff0c;海量时序数据持续产生&#xff0c;对于这些数据的实时存储、高效查询和分析已经成为时序数据库&#xff08;Time Series Database&#xff0c;TSDB&#xff09;的核心竞争力。作为一款高性能的时序数据库&#xff0c;TDengine 不仅采用…

Pandas数据分析:掌握agg()方法的高级应用“

今天一个知识点&#xff1a; agg() 方法的 func 参数可以传入单个函数或函数列表。 agg() 方法在Pandas库中用于对DataFrame或Series的列进行聚合操作。func 参数可以传入单个&#xff08;值得注意的是&#xff0c;这里数据是一个单个而不是多个的内容&#xff0c;自己在这个…

【全开源】沃德会务会议管理系统(FastAdmin+ThinkPHP+Uniapp)

沃德会务会议管理系统一款基于FastAdminThinkPHPUniapp开发的会议管理系统&#xff0c;对会议流程、开支、数量、标准、供应商提供一种标准化的管理方法。以达到量化成本节约&#xff0c;风险缓解和服务质量提升的目的。适用于大型论坛、峰会、学术会议、政府大会、合作伙伴大会…

docker 安装达梦8

背景 X86-64架构使用Docker安装dm8_20240422_x86_rh6_64_rq_std_8.1.3.100_pack2.tar 1.下载Docker安装包 达梦官网下载dm8_20240422_x86_rh6_64_rq_std_8.1.3.100_pack2.tar安装包 快速下载通道&#xff1a; 达梦镜像包 2.将安装包上传至服务器&#xff0c;并加载镜像 拷…

结合人工智能的在线教育系统:开发与实践

人工智能&#xff08;AI&#xff09;正在革新各行各业&#xff0c;教育领域也不例外。结合AI技术的在线教育系统能够提供个性化的学习体验、智能化的教学辅助和高效的数据分析&#xff0c;从而大大提升教育质量和学习效果。本文将探讨结合AI技术的在线教育系统的开发与实践&…

如何调整C#中数组的大小

前言 数组存储多个相同类型的一种非常常用的数据结构。它长度是固定&#xff0c;也就是数组一旦创建大小就固定了。C# 数组不支持动态长度。那在C#中是否有方法可以调整数组大小呢&#xff1f;本文将通过示例介绍一种调整一维数组大小的方法。 方法 数组实例是从 System.Arr…

Type.GetTypeFromProgID 调用com组件

Type.GetTypeFromProgID 方法用于通过程序标识符&#xff08;ProgID&#xff09;获取 COM 类型。ProgID 是一个字符串标识符&#xff0c;用于标识 COM 组件。它通常在注册表中配置&#xff0c;并指向一个具体的 COM 类的 CLSID&#xff08;类标识符&#xff09;。 什么是 Prog…

【数据结构与算法】顺序查找、折半查找、分块查找

文章目录 顺序查找实现对有序表的顺序查找 二分查找&#xff08;折半查找&#xff09;实现二分查找判定树 分块查找&#xff08;索引顺序查找&#xff09;最理想的分块情况 顺序查找 顺序查找&#xff0c;又叫线性查找。适用于线性表。它的核心思路是从线性表的一端开始&#…

Unity的ScrollView滚动视图复用

发现问题 在游戏开发中有一个常见的需求&#xff0c;就是需要在屏幕显示多个&#xff08;多达上百&#xff09;显示item&#xff0c;然后用户用手指滚动视图可以选择需要查看的item。 现在的情况是在100个data的时候&#xff0c;Unity引擎是直接创建出对应的100个显示item。 …

13个行业数据分析指标体系如何建设100问

提供针对13个行业的数据分析指标体系的全面指南&#xff0c;涵盖各行业的关键指标和分析维度&#xff0c;帮助读者深入了解和构建有效的指标体系。以下是文章的主要内容&#xff1a; 电商行业数据指标体系&#xff1a;包括客户价值、商品、网站流量、整体运营、市场营销活动、市…

若依-前后端分离项目学习

第一天&#xff08;6.24&#xff09; 具体参考视频 b站 楠哥教你学Java 【【开源项目学习】若依前后端分离版&#xff0c;通俗易懂&#xff0c;快速上手】 https://www.bilibili.com/video/BV1HT4y1d7oA/?share_sourcecopy_web&vd_sourcecd9334b72b49da3614a4257…

C++如何实现继承和多态

继承 继承是指一个类&#xff08;子类&#xff09;从另一个类&#xff08;父类&#xff09;继承属性和方法。C支持单继承和多继承。 #include <iostream>// 基类&#xff08;父类&#xff09; class Animal { public:// 基类中的方法void eat() {std::cout << &q…

Elasticsearch Scroll 报错entity content is too long

2024-06-24 15:22:01:568 ERROR [task-31] (ScrollFetcherProduceAction.java:129) 访问ES出错org.apache.http.ContentTooLongException: entity content is too long [112750110] for the configured buffer limit [104857600]at org.elasticsearch.client.HeapBufferedAsync…

一下出来4个面试官,这是要舌战群儒啊

老张昨天下午请假了&#xff0c;我猜他就是面试去了。果不其然&#xff0c;今天来了&#xff0c;我问老张&#xff1a;昨天面试如何&#xff1f;老张很惊讶的问&#xff1a;你怎么知道我面试去了&#xff1f;我迫不及待的说&#xff1a;赶紧说说昨天面试的场景&#xff0c;给我…

智慧安防/边缘计算EasyCVR视频汇聚网关:EasySearch无法探测到服务器如何处理?

安防监控EasyCVR智能边缘网关/视频汇聚网关/视频网关属于软硬一体的边缘计算硬件&#xff0c;可提供多协议&#xff08;RTSP/RTMP/国标GB28181/GAT1400/海康Ehome/大华/海康/宇视等SDK&#xff09;的设备接入、音视频采集、视频转码、处理、分发等服务&#xff0c;系统具备实时…

Redis-事务-watch-unwatch

文章目录 1、监视key2、提交事务 1、监视key 打开两个窗口&#xff0c;第一个窗口先监视key&#xff0c;然后开始事务&#xff0c;然后再打开第二个窗口&#xff0c;修改balance为0 2、提交事务 此时事务被打断

playwright vscode 插件源码解析

Playwright vscode插件主要功能 Playwright是微软开发的一款主要用于UI自动化测试的工具&#xff0c;在vscode中上安装playwright vscode插件&#xff0c;可以运行&#xff0c;录制UI自动化测试。 playwright vscode插件主要包括两块功能&#xff0c;功能一是在Test Explorer中…

探索 Java 死锁:常见原因与解决方案

什么是死锁&#xff1f; 死锁是一种特殊的情况&#xff0c;发生在两个或多个线程彼此等待对方持有的资源&#xff0c;从而陷入无限等待的状态。具体而言&#xff0c;死锁通常涉及以下四个必要条件&#xff1a; 互斥条件&#xff1a;至少有一个资源被一个线程独占。持有并等待…

解决Microsoft Edge浏览器无法使用英文翻译功能

一、问题描述 原来我们使用的Microsoft Edge浏览器是可以对英文界面选择翻译为中文的&#xff1b;但是最近该浏览器更新过后右上角的翻译图标找不到了&#xff0c;无法翻译英文界面内容。 二、解决方法 2.1、打开浏览器的设置界面 2.2、选择语言 2.3、将首选语言下除中文外的…