SQLAlchemy 使用封装实例

类封装

database.py

#! /usr/bin/env python
# -*- coding: utf-8 -*-import sys
import json
import logging
from datetime import datetimefrom core.utils import classlock, parse_bool
from core.config import (MYSQL_HOST,MYSQL_PORT,MYSQL_USER,MYSQL_PASS,MYSQL_DATABASE,MYSQL_TIMEOUT
)from sqlalchemy import create_engine, Column, desc, not_, func
from sqlalchemy import Integer, String, Boolean, DateTime, Text, Enum     # Text存储大不固定长的字符串
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.exc import SQLAlchemyErrorBase = declarative_base()log = logging.getLogger("log")SCHEMA_VERSION = "1.0.0"class User(Base):__tablename__ = "user"id = Column(Integer(), primary_key=True)file_size = Column(Integer(), nullable=False)      # nullable 不可为空md5 = Column(String(32), nullable=False)crc32 = Column(String(8), nullable=False)sha1 = Column(String(40), nullable=False)sha256 = Column(String(64), nullable=False)sha512 = Column(String(128), nullable=False)memory = Column(Boolean, nullable=False, default=False)ssdeep = Column(String(255), nullable=True)start_time = Column(DateTime(timezone=False), nullable=True, default=datetime.now)def __repr__(self):   # 查询返回的结果return "<User('{0}','{1}')>".format(self.id, self.sha256)def to_dict(self):"""将对象转换为dict.@return: dict"""d = {}for column in self.__table__.columns:d[column.name] = getattr(self, column.name)return ddef to_json(self):"""将对象转换为JSON.@return: JSON data"""return json.dumps(self.to_dict())class Version(Base):"""用于确定实际数据库架构发布的表."""__tablename__ = "version"version_num = Column(String(32), nullable=False, primary_key=True)class Database(object):"""分析队列数据库此类处理为内部队列创建数据库用户经营它还提供了一些与之交互的功能"""def __init__(self, schema_check=True, echo=False):"""@param dsn: 数据库连接字符串.@param schema_check: 禁用或启用数据库架构版本检查.@param echo: echo sql 查询."""self._lock = Noneself.schema_check = schema_checkself.echo = echodef connect(self, schema_check=None, dsn=None, create=True):"""连接到数据库后端."""if schema_check is not None:self.schema_check = schema_checkif not dsn:dsn = "mysql://{0}:{1}@{2}:{3}/{4}".format(MYSQL_USER, MYSQL_PASS, MYSQL_HOST, MYSQL_PORT, MYSQL_DATABASE)#dsn = "mysql://{username}:{password}@{hostname}:{port}/{database}"self._connect_database(dsn)# 禁用SQL日志记录。打开它进行调试.self.engine.echo = self.echo# 连接超时.self.engine.pool_timeout = MYSQL_TIMEOUT# 获取数据库会话.self.Session = sessionmaker(bind=self.engine)if create:self._create_tables()def _create_tables(self):"""创建所有数据库表等."""try:Base.metadata.create_all(self.engine)except SQLAlchemyError as e:raise ("无法创建或连接到数据库: %s" % e)# 处理架构版本控制.# TODO: it's a little bit dirty, needs refactoring.tmp_session = self.Session()if not tmp_session.query(Version).count():# 设置数据库架构版本.tmp_session.add(Version(version_num=SCHEMA_VERSION))try:tmp_session.commit()except SQLAlchemyError as e:raise ("无法设置架构版本: %s" % e)tmp_session.rollback()finally:tmp_session.close()else:# 检查数据库版本是否为预期版本.last = tmp_session.query(Version).first()tmp_session.close()if last.version_num != SCHEMA_VERSION and self.schema_check:log.warning("数据库架构版本不匹配:找到 %s,应为 %s.",last.version_num, SCHEMA_VERSION)log.error("(可选)进行备份,然后通过运行migrate应用最新的数据库迁移。")sys.exit(1)def __del__(self):"""断开连接池."""self.engine.dispose()def _connect_database(self, connection_string):"""连接到数据库.@param connection_string: 指定数据库的连接字符串"""try:if connection_string.startswith("sqlite"):# 使用“check_same_thread”在多个线程上禁用sqlite安全检查.self.engine = create_engine(connection_string, connect_args={"check_same_thread": False})elif connection_string.startswith("postgres"):# 禁用SSL模式以避免使用sqlalchemy和多进程时出现一些错误.# See: http://www.postgresql.org/docs/9.0/static/libpq-ssl.html#LIBPQ-SSL-SSLMODE-STATEMENTS# TODO 检查这是否仍然相关。特别是假设我们不再使用多处理.self.engine = create_engine(connection_string, connect_args={"sslmode": "disable"})else:self.engine = create_engine(connection_string)except ImportError as e:lib = str(e).split()[-1].strip("'")if lib == "MySQLdb":log.error("缺少MySQL数据库驱动程序(在Linux上使用 `pip install mysql-python` 安装,或在Windows上使用 `pip-install mysqlclient`)")if lib == "psycopg2":log.error("缺少PostgreSQL数据库驱动程序 (使用 `pip install psycopg2`)")log.error("缺少未知的数据库驱动程序,无法导入 %s" % lib)sys.exit(-1)@classlockdef add_user(self, file_size, md5, crc32, sha1, sha256, sha512, memory, ssdeep=None):session = self.Session()# 将空字符串和None值转换为有效的int# if not timeout:#     timeout = 0# if not priority:#     priority = 1#try:memory = parse_bool(memory)except ValueError:memory = False## try:#     enforce_timeout = parse_bool(enforce_timeout)# except ValueError:#     enforce_timeout = Falseuser = User()user.file_size = file_sizeuser.md5 = md5user.crc32 = crc32user.sha1 = sha1user.sha256 = sha256user.sha512 = sha512user.memory = memoryuser.ssdeep = ssdeepsession.add(user)try:session.commit()except SQLAlchemyError as e:log.error("数据库添加 user 错误: {0}".format(e))return Falsefinally:session.close()return True@classlockdef select_user(self, id=None):session = self.Session()try:search = session.query(User)if id:search = search.filter_by(id=id)# 排序# search = search.order_by(id)# search = search.order_by(desc(User.id))  倒叙tasks = search.all()return tasksexcept SQLAlchemyError as e:log.error("数据库查看所有 user 错误: {0}".format(e))return []finally:session.close()@classlockdef update_user(self, id, new_file_size):session = self.Session()try:search = session.query(User).filter(User.id == id).first()search.file_size = new_file_size# session.query(User).filter(User.id == 1).update({'file_size': 'new_file_size'})session.commit()except SQLAlchemyError as e:log.error("数据库更新 user 错误: {0}".format(e))return Falsefinally:session.close()return True@classlockdef delete_user(self, id):session = self.Session()try:search = session.query(User).filter(User.id == id).first()if search:session.delete(search)session.commit()except SQLAlchemyError as e:log.error("数据库删除 user 错误: {0}".format(e))return Falsefinally:session.close()return True

调用运行

merage.py

#! /usr/bin/env python
# -*- coding: utf-8 -*-import time
import loggingfrom core.database import Databaselog = logging.getLogger("log")class Merge(object):def __init__(self):db = Database()db.connect()res = db.add_user(32, "11", "22", "33", "44", "55", "off")if not res:print("添加错误")print(db.select_user())# res = db.update_user(1, 50)# if not res:#     print("更新错误")# res = db.delete_user(2)# if not res:#     print("删除错误")if __name__ == '__main__':merge = Merge()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/102796.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

微信小程序wxml使用过滤器

微信小程序wxml使用过滤器 1. 新建wxs2. 引用和使用 如何在微信小程序wxml使用过滤器&#xff1f; 犹如Angular使用pipe管道这样子方便&#xff0c;用的最多就是时间格式化。 下面是实现时间格式化的方法和步骤&#xff1a; 1. 新建wxs 插入代码&#xff1a; /*** 管道过滤工…

互动设计:深入了解用户体验的关键

交互是人与计算机系统之间的互动过程。在计算机领域中&#xff0c;交互是人机交互技术的核心内容之一。交互设计是一种基于人类行为科学、心理学、人体工程学等领域的专业设计&#xff0c;目的是创造用户友好的、易于使用的计算机软件、网络、移动应用等。交互的本质在于用户的…

SpringBoot 接口 字节数组直接显示为图片

源码&#xff1a; import java.io.ByteArrayOutputStream; import javax.imageio.ImageIO; import org.springframework.web.bind.annotation.RequestMapping;/*** 获取二维码图像* 二维码支付** param price 金额* return 二维码图像* throws IOException IOException*/ Requ…

Android Native 开发 要点记录

Android Studio 中写 C 代码 android studio创建C项目_android studio native c-CSDN博客 项目配置参考 【CMake】CMakeLists.txt的超傻瓜手把手教程&#xff08;附实例源码&#xff09;_【cmake】cmakelists.txt的超傻瓜手把手教程(附实例源码)-CSDN博客 CMakeLists.txt 讲解…

NZ系列工具NZ05:VBA不打开工作簿获取其内容

我的教程一共九套及VBA汉英手册一部&#xff0c;分为初级、中级、高级三大部分。是对VBA的系统讲解&#xff0c;从简单的入门&#xff0c;到数据库&#xff0c;到字典&#xff0c;到高级的网抓及类的应用。大家在学习的过程中可能会存在困惑&#xff0c;这么多知识点该如何组织…

stm32mp157中断简单应用

设置按键中断&#xff0c;按键1按下&#xff0c;LED亮&#xff0c;再按一次&#xff0c;灭 按键2按下&#xff0c;蜂鸣器响。再按一次&#xff0c;不响 按键3按下&#xff0c;风扇转&#xff0c;再按一次&#xff0c;风扇停 main.c #include "gpio.h" #include &…

leetcode 打家劫舍篇

198. 打家劫舍 你是一个专业的小偷&#xff0c;计划偷窃沿街的房屋。每间房内都藏有一定的现金&#xff0c;影响你偷窃的唯一制约因素就是相邻的房屋装有相互连通的防盗系统&#xff0c;如果两间相邻的房屋在同一晚上被小偷闯入&#xff0c;系统会自动报警。 给定一个代表每个…

Transformer预测 | Pytorch实现基于Transformer的锂电池寿命预测(NASA数据集)

文章目录 效果一览文章概述模型描述程序设计参考资料效果一览 文章概述 Pytorch实现基于Transformer 的锂电池寿命预测,环境为pytorch 1.8.0,pandas 0.24.2 随着充放电次数的增加,锂电池的性能逐渐下降。电池的性能可以用容量来表示,故寿命预测 (RUL) 可以定义如下: SOH(t…

Linux Shell 实现一键部署vmtools

VMware Tools 简介 VMware Tools 中包含一系列服务和模块&#xff0c;可在 VMware 产品中实现多种功能&#xff0c;从而使用户能够更好地管理客户机操作系统&#xff0c;以及与客户机操作系统进行无缝交互。 VMware Tools 具备以下功能&#xff1a; 将消息从主机操作系统传递…

TCP/IP(十五)拥塞控制

一 拥塞控制 ① 拥塞控制必要性 思考&#xff1a; 为什么要有拥塞控制呀,不是有流量控制了吗&#xff1f; ② 拥赛窗口 cwnd 什么是拥塞窗口? 和发送窗口有什么关系呢?明白&#xff1a; cwnd、swnd、rwnd 缩写 含义 ③ 如何知道当前网络是否出现了拥塞呢&#xff1f;…

不定积分(原函数)存在性定理、定积分存在性定理、变限积分存在性定理

1.不定积分(原函数)存在性定理、定积分存在性定理、变限积分存在性定理 笔记来源&#xff1a; 1.10个命题搞懂可积和原函数存在 2.考研变限积分概念超详细&#xff0c;超通俗讲解&#xff08;变限积分和原函数关系&#xff09; 声明&#xff1a;本文截图主要来自bili心一学长、…

Edge使用猴油脚本实战(实验室安全考试系统刷在线时长——网站永久自动刷新)

介绍 篡改猴 (Tampermonkey) 是拥有 超过 1000 万用户 的最流行的浏览器扩展之一。它允许用户自定义并增强您最喜爱的网页的功能。用户脚本是小型 JavaScript 程序&#xff0c;可用于向网页添加新功能或修改现有功能。使用 篡改猴&#xff0c;您可以轻松在任何网站上创建、管理…

Android平台实现lottie动画

1、lottie动画简介 Lottie 是一个应用十分广泛动画库&#xff0c;适用于Android、iOS、Web、ReactNative、Windows的库&#xff0c;它解析了用Bodymovin导出为json的Adobe After Effects动画&#xff0c;并在移动和网络上进行了原生渲染。它提供了一套完整得从AE到各个终端的…

【Java 进阶篇】JavaScript二元运算符详解

JavaScript是一门多用途的编程语言&#xff0c;它支持各种运算符&#xff0c;包括二元运算符。二元运算符用于执行两个操作数之间的操作&#xff0c;这两个操作数通常是变量、值或表达式。在本篇博客中&#xff0c;我们将详细探讨JavaScript的二元运算符&#xff0c;包括它们的…

记录:R语言生成热图(非相关性)

今天解决了一个困扰了我很久的问题&#xff0c;就是如何绘制不添加相关性的热图。一般绘制热图是使用corrplot包画相关性图&#xff0c;但是这样有一个前提&#xff0c;就是输入的数据集必须进行相关性分析。那么如果我不需要进行相关性分析&#xff0c;而是直接绘制能够反应数…

【快刊推荐】CCF-C类,2/3区SCIE,仅29天录用,16天见刊!

计算机领域 • 好刊推荐 01 期刊简介 影响因子&#xff1a;3.0-4.0 检索数据库&#xff1a;SCIE 在检 期刊分区&#xff1a;JCR2/3区&#xff0c;中科院4区 02 影响因子 影响因子呈现逐年稳步上升的趋势 03 期刊分区&#xff1a;JCR2/3区&#xff0c;中科院4区 04 预警…

Haskell网络编程:从数据采集到图片分析

概述 爬虫技术在当今信息时代中发挥着关键作用&#xff0c;用于从互联网上获取数据并进行分析。本文将介绍如何使用Haskell进行网络编程&#xff0c;从数据采集到图片分析&#xff0c;为你提供一个清晰的指南。我们将探讨如何使用亿牛云爬虫代理来确保高效、可靠的数据获取&am…

C++安装qt软件教程

目录 一、工具 二、安装步骤 1.1next 1.2安装目录 1.3安装环境设置选项 1.4Qt5.14.2 --> MinGW 7.3.0 64-bit 1.5 Qt5.14.2 --> 3D以下全选 1.6下一步 1.7下一步 1.8安装 三、什么是 Qt Qt 是一个跨平台的 C图形用户界面应用程序框架。 它为应用程序开发者提…

Python 单元测试设置

单元测试检查特定代码单元或模块是否按照开发人员的预期执行。 大多数时候&#xff0c;我们测试的代码单元是一个函数。 同样&#xff0c;我们可以测试所有功能。 作为最佳实践&#xff0c;至少在开发过程中&#xff0c;我们应该进行单元测试。 因此&#xff0c;在开发过程的…

java并发之AQS详解(待更)

一、为什么要用AQS同步框架&#xff1f; 开发者如果不了解JMM和多线程编程&#xff0c;就会写出很多线程不安全的程序&#xff0c;即使是经验丰富的程序员&#xff0c;并发编程也难免会出错。 而对于java程序员来说&#xff0c;并发编程就变得容易得多了&#xff0c;因为并发编…