python sqlite3 线程池封装

1. 封装 sqlite3

1.1. 依赖包引入

# -*- coding: utf-8 -*-
#import os
import sys
import datetime
import loggingimport sqlite3

1.2. 封装类

class SqliteTool(object):#def __init__(self, host, port, user, password, database):def __init__(self, host, database):self._host = host#self._port = port#self._user = user#self._password = passwordself._database = databaseself._pool = Noneself._maxconns = 6  # 连接池中最多有多少个连接print("__init__", self._database)

1.3. 连接池操作

    def init_pool(self):'''初始化连接池'''try:logging.info('Begin to create {0} postgresql pool on:{1}.\n'.format(self._host, datetime.datetime.now()))pool = []for _ in range(self._maxconns):# check_same_thread=False 支持多线程conn = sqlite3.connect(self._database, check_same_thread=False)pool.append(conn)self._pool = pool#print("init_pool", self._maxconns, len(self._pool), self._pool)logging.info('SUCCESS: create {0} postgresql pool success on {1}.\n'.format(self._host, datetime.datetime.now()))except Exception as e:logging.error('ERROR: create {0} postgresql pool failed on {1}.\n'.format(self._host, datetime.datetime.now()))self.close_pool()sys.exit('ERROR: create postgresql pool error caused by {0}'.format(str(e)))def close_pool(self):'''关闭 pool'''if self._pool != None:for conn in self._pool:conn.close()def get_conn(self):if not self._pool:self.init_pool()return self._pool.pop()def close_conn(self, conn):if self._pool:self._pool.append(conn)

1.4. 增删改查

1.4.1. 创建表

    # 创建数据表def create_table(self, sql: str):"""创建表:param sql: create sql语句:return: True表示创建表成功"""print("create_table", sql)result = Falsetry:conn = self.get_conn()cursor = conn.cursor()cursor.execute(sql)print("[create table success]")result = Trueexcept Exception as e:logging.error('ERROR: execute {0} causes error {1} in create_table'.format(sql, str(e)))sys.exit('ERROR: create table from database error caused {0}'.format(str(e)))finally:cursor.close()conn.commit()#conn.close()self.close_conn(conn)return result

1.4.2. 删除表

    # 删除数据表def drop_table(self, sql: str):"""删除表:param sql: drop sql语句:return: True表示删除成功"""result = Falsetry:conn = self.get_conn()cursor = conn.cursor()cursor.execute(sql)print("[drop table success]")result = Trueexcept Exception as e:logging.error('ERROR: execute {0} causes error {1} in drop_table'.format(sql, str(e)))sys.exit('ERROR: drop table from database error caused {0}'.format(str(e)))finally:cursor.close()conn.commit()#conn.close()self.close_conn(conn)return result

1.4.3. 插入数据

    def exec_insert(self, sql):'''执行插入'''result = Falsetry:conn = self.get_conn()cursor = conn.cursor()cursor.execute(sql)result =  Trueexcept Exception as e:logging.error('ERROR: execute {0} causes error {1} in exec_insert'.format(sql, str(e)))sys.exit('ERROR: insert data from database error caused {0}'.format(str(e)))finally:cursor.close()conn.commit()#conn.close()self.close_conn(conn)return resultdef exec_insert_plus(self, table: str, params: dict):'''执行插入'''result = Falsetry:key_tup = tuple(params.keys())key_str = ",".join(key_tup)# different with psqlval_str = ",".join(("?",)*len(key_tup))sql_str = "insert into " + table + " (" + key_str + ") values (" + val_str + ")"#val_tup = tuple(params.values())val_tup = ()for item in params.values():if type(item) == list:val_tup += (json.dumps(item),)elif type(item) == str:val_tup += (item,)elif type(item) == int:val_tup += (item,)else:val_tup += (item,)#val_tup.append(str(item))#print("exec_insert_plus", sql_str, val_tup)conn = self.get_conn()cursor = conn.cursor()cursor.execute(sql_str, val_tup)result =  Trueexcept Exception as e:logging.error('ERROR: execute {0} causes error {1} in exec_insert_plus'.format(table, str(e)))sys.exit('ERROR: insert data from database error caused {0}'.format(str(e)))finally:cursor.close()conn.commit()#conn.close()self.close_conn(conn)return resultdef exec_insert_many(self, table: str, params: List[Dict]):def dict_to_str(tab: str, param: Dict):key_tup = tuple(param.keys())key_str = ",".join(key_tup)# different with psqlval_str = ",".join(("?",)*len(key_tup))sql_str = "insert into " + tab + " (" + key_str + ") values (" + val_str + ")"return sql_strdef dict_to_tuple(param: Dict):val_tup = ()for item in param.values():if type(item) == list:val_tup += (json.dumps(item),)elif type(item) == str:val_tup += (item,)elif type(item) == int:val_tup += (item,)else:val_tup += (item,)#val_tup.append(str(item))return val_tup'''执行插入'''result = Falseif len(params) <= 0:return resulttry:sql_str = dict_to_str(table, params[0])val_lst = []for param in params:val_lst.append(dict_to_tuple(param))print("exec_insert_many", sql_str, val_lst)conn = self.get_conn()cursor = conn.cursor()cursor.executemany(sql_str, val_lst)result =  Trueexcept Exception as e:logging.error('ERROR: execute {0} causes error {1} in exec_insert'.format(sql_str, str(e)))sys.exit('ERROR: insert data from database error caused {0}'.format(str(e)))finally:cursor.close()conn.commit()#conn.close()self.close_conn(conn)return result

1.4.4. 删除数据

	# sql = "DELETE from users where user_id='83f7d86b594e4b26a7196ab761afcc7c';"def exec_delete(self, sql):'''执行查询'''result = Falsetry:conn = self.get_conn()cursor = conn.cursor()cursor.execute(sql)result =  Trueexcept Exception as e:logging.error('ERROR: execute {0} causes error {1} in exec_delete'.format(sql, str(e)))sys.exit('ERROR: delete data from database error caused {0}'.format(str(e)))finally:cursor.close()conn.commit()#conn.close()self.close_conn(conn)return result

1.4.5. 更新数据

	# 修改单个值# update tasks set status='running' where task_id='0791216839b04d5c88846817f78280cc';# 修改多个值# update tasks set status='running',score='10' where task_id='0791216839b04d5c88846817f78280cc';def exec_update(self, sql):'''执行更新'''result = Falsetry:conn = self.get_conn()cursor = conn.cursor()cursor.execute(sql)result =  Trueexcept Exception as e:logging.error('ERROR: execute {0} causes error {1} in exec_update'.format(sql, str(e)))sys.exit('ERROR: update data from database error caused {0}'.format(str(e)))finally:cursor.close()conn.commit()#conn.close()self.close_conn(conn)return result

1.4.6. 查询数据

    # select * from users where user_name='hello';def exec_select(self, sql):'''执行查询'''try:conn = self.get_conn()cursor = conn.cursor()cursor.execute(sql)result = cursor.fetchall()#result = cursor.fetchone()except Exception as e:logging.error('ERROR: execute {0} causes error {1} in exec_select'.format(sql, str(e)))sys.exit('ERROR: load data from database error caused {0}'.format(str(e)))finally:cursor.close()#conn.close()#print("exec_select", len(self._pool), self._pool)print("init_pool", self._maxconns, len(self._pool), self._pool)self.close_conn(conn)return result

1.4.7. 测试

    def test_select(self, sql):result = self.exec_select(sql)print("test_select", result)return result

2. 操作使用实例

# for test
from typing import Optional, List, Dict, Union
from pydantic import BaseModel, Fieldclass ......if __name__ == '__main__':dbhost = ""dbdatabase = "./test.db"db = SqliteTool(dbhost, dbdatabase)class TaskInDB(BaseModel):task_id: strdisabled: intdef create_tests_table(db):sql_str = "create table if not exists tests("sql_str += "task_id char(32) primary key,"sql_str += "disabled int not null"sql_str += ");"return db.create_table(sql_str)def drop_tests_table(db):sql_str = "drop table if exists tests;"return db.drop_table(sql_str)def get_tests_indb(db, tasks: Union[List[str], str, None] = None):if tasks == None:sql_str = "select * from tests;"if type(tasks) == list:print("list", tasks)key_str = ",".join(tasks)sql_str = "select * from tests where task_id in (" + key_str + ");"elif type(tasks) == str:print("str", tasks)sql_str = "select * from tests where task_id ='"+tasks+"';"elif type(tasks) == None:print("none")ret_tasks = db.exec_select(sql_str)return ret_tasksdef create_tests_indb(db, tasks: List[TaskInDB]):#return db.exec_insert_plus("tests", task_indb.model_dump())#return db.exec_insert_plus("tests", task_indb.dict())params = []for task in tasks:print("create_tests_indb", task)#params.append(task.model_dump())params.append(task.dict())return db.exec_insert_many("tests", params)#  重建数据库if not drop_tests_table(db) or not create_tests_table(db):print("ERROR")# 创建两条记录task_indb1 = TaskInDB(task_id="11111111", disabled=1)#create_tests_indb(db, task_indb)task_indb2 = TaskInDB(task_id="22222222", disabled=0)#create_tests_indb(db, task_indb)task_indb3 = TaskInDB(task_id="33333333", disabled=0)#create_tests_indb(db, task_indb)create_tests_indb(db, [task_indb1, task_indb2, task_indb3])# 查询记录#key_tup = tuple(TaskInDB.model_fields.keys())key_tup = tuple(TaskInDB.__fields__.keys())#key_str = ",".join(key_tup)# allret_tasks = get_tests_indb(db)#  str#ret_tasks = get_tests_indb(db, tasks="11111111")# list#ret_tasks = get_tests_indb(db, tasks=["11111111", "22222222"])for ret_task in ret_tasks:print(ret_task)task_indb = TaskInDB(**{key: ret_task[i] for i,key in enumerate(key_tup)})print(task_indb)print("OK")

3. ProgrammingError 单线程报错

python在执行数据库查询语句出现如下异常:
sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread. The object was created in thread id 65984 and this is thread id 61236.
错误原因:
因为在python操作某个数据库的数据的线程必须和创建数据库的线程是同一个线程,不然就会出现异常。简而言之,应该就是create语句和select语句应该都在同一个线程下执行才可以。
所以,要想解决这个问题有如下两种方法:
1.(这个方法有点麻烦,不建议,不过可以根据个人项目实际情况来决定)
把创建数据库对象的代码剪切到当前线程里来,同时把之前的已经创建成功的数据库删掉重新执行创建语句
2. (推荐使用)
把当前线程里的connect语句改为这种形式:conn=sqlite3.connect(“stocks.db”,check_same_thread = False)
即加上:check_same_thread = False
check_same_thread=False的作用: python sqlite3的线程模式默认串行, 如果需要再多线程中用一个句柄,需要加此参数,否则将出现上述错误。
这样在执行操纵数据库的语句时就不再检查线程是否相同的问题了。
参考:https://blog.csdn.net/AshleyXM/article/details/104812879

同时也需要加互斥锁, 否则在 execute 时偶现(多线程同时execute)地会报如下错误:
cannot commit - no transaction is active
参考:https://blog.csdn.net/weixin_43380311/article/details/120221910

4. 添加互斥锁

要在 Python 中锁定 sqlite3 数据库,可以使用 threading 模块中的 Lock 对象。下面是一个示例代码:

import sqlite3
import threading# 创建一个锁对象
lock = threading.Lock()# 定义一个函数来执行数据库操作
def execute_query(query):# 获取锁lock.acquire()try:# 连接到sqlite3数据库conn = sqlite3.connect('your_database.db')cursor = conn.cursor()# 执行查询操作cursor.execute(query)# 提交事务conn.commit()# 关闭数据库连接conn.close()finally:# 释放锁lock.release()# 调用函数执行数据库查询
execute_query('SELECT * FROM your_table')

在上面的示例中,我们首先创建了一个 Lock 对象。然后,在执行数据库操作之前,我们使用 lock.acquire() 获取锁,确保只有一个线程可以执行数据库查询。在执行完数据库操作后,我们使用 lock.release() 释放锁,以允许其他线程获取锁并执行数据库操作。
这种方式可以确保在并发访问sqlite3数据库时,每次只有一个线程可以执行数据库操作,避免了数据竞争和不一致性的问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/646642.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Typora + PicGo + GitHub搭建图床

Typora PicGo GitHub搭建图床 1. Typora下载破解 这一步自行百度 2. PicGo下载 PicGo is Here | PicGo 自行下载安即可 3. GitHub仓库设置 gitHub注册略过&#xff0c;如果不能访问请科学上网 创建仓库 生成访问token 点击右上角头像 -> setting -> 点击左边最…

Hotspot源码解析-第25章-类的初始化

第25章-类的初始化 这一章主要是讲类的初始化操作&#xff0c;后续类加载章节中也会用到这一章的知识&#xff0c;只不过&#xff0c;这里就讲&#xff0c;是因为虚拟在初始化过程中&#xff0c;需要对基础类&#xff0c;比如System/Thread等类进行初始化操作&#xff0c;所以…

第三季《乐队风暴》全国总决赛圆满落幕

2024年1月21日&#xff0c;由广东珠江、盛娱星汇海选联合主办的第三季《乐队风暴》全国海选歌手赛道全国总决赛在广州罗格镇MUSIC LIVE&#xff08;太古仓店&#xff09;正式打响&#xff0c;第三季《乐队风暴》全国海选开启以来共有超8000人报名渴望登上绚丽舞台&#xff0c;从…

二叉搜索树、二叉排序树(查找、插入和删除)——Java版本

1. 概念 二叉搜索树又称二叉排序树&#xff0c;它或者是一棵空树&#xff0c;或者是具有以下性质的二叉树: 若它的左子树不为空&#xff0c;则左子树上所有节点的值都小于根节点的值若它的右子树不为空&#xff0c;则右子树上所有节点的值都大于根节点的值它的左右子树也分别…

Rsync服务

一、Rsync概述 rsync英文称为 remote synchronizetion&#xff0c;rsync具有可使本地和远程两台主机之间的数据快速复制同步镜像、远程备份的功能&#xff0c;功能类似于ssh带的scp命令&#xff0c;优于scp命令的功能&#xff0c;scp每次都是全量拷贝&#xff0c;而rsync可以增…

Rust Web小项目

Rust 第26节 Web小项目 监听TCP链接 use std::net::TcpListener;fn main() {let listener TcpListener::bind("127.0.0.1:7887").unwrap(); //监听7887端口&#xff0c;成功后&#xff0c;就创建一个linstenerfor stream in listener.incoming() { // listener.…

2024年mongodb自建三节点副本集详细教程

环境说明 系统centos7.9 自建服务器或云服务器&#xff0c;硬件要求不低于2核2G内存&#xff0c;20G硬盘&#xff0c;文件系统默认是ext4即可。 生产环境最好单独一个磁盘存放数据库&#xff0c;方便数据备份和还原&#xff0c;避免干扰到其他磁盘的运作。 mongodb 4.4.27 …

HTML-表格

表格 1.基本结构 一个完整的表格由&#xff1a;表格标题、表格头部、表格主体、表格脚注&#xff0c;四部分组成 表格涉及到的标签&#xff1a; table&#xff1a;表格 caption&#xff1a;标题 thead&#xff1a;表格头部 tbody&#xff1a;表格主体 tfoot&#xff1a;表格注…

android:persistent和android:priority的区别,对进程优先级有什么影响?

前言&#xff1a;写的apk因为系统busy给我kill了&#xff0c;(adj 900): kill all background&#xff0c;在AndroidManifest.xml添加android:persistent"true"后&#xff0c;被甲方要求不能这样做&#xff0c;还是得从adj改&#xff0c;把 priority改成1000 android…

可Pin to Pin兼容DRV8837的国产H桥电机驱动芯片,具大电流,短gnd,短电源保护功能

在国产牙刷&#xff0c;电子锁设计中&#xff0c;以前方案很多采用TI的DRV8837做直流电机驱动&#xff0c;随着中美贸易战和牙刷&#xff0c;电子锁等产品价格平民化普及&#xff0c;很多大厂在做国产化替代设计方案&#xff0c;GLOBALCHIP 的电机驱动芯片GC8837&#xff0c;价…

解读Android进程优先级ADJ算法

本文基于原生Android 9.0源码来解读进程优先级原理,基于篇幅考虑会精炼部分代码 一、概述 1.1 进程 Android框架对进程创建与管理进行了封装,对于APP开发者只需知道Android四大组件的使用。当Activity, Service, ContentProvider, BroadcastReceiver任一组件启动时,当其所…

YOLOv8改进 | Conv篇 | 2024.1月最新成果可变形卷积DCNv4(适用检测、Seg、分类、Pose、OBB)

一、本文介绍 本文给大家带来的改进机制是2024-1月的最新成果DCNv4,其是DCNv3的升级版本,效果可以说是在目前的卷积中名列前茅了,同时该卷积具有轻量化的效果!一个DCNv4参数量下降越15Wparameters左右,。它主要通过两个方面对前一版本DCNv3进行改进:首先,它移除了空间聚…

Python 流静态文件过滤、端口过滤、同域过滤(host过滤)、代理拦截

目录 静态文件过滤 需求 代码 端口过滤 需求 代码 同域过滤&#xff08;host过滤&#xff09; 需求 代码 静态文件过滤 需求 流量中的url包含大量静态文件请求信息&#xff0c;过滤掉 代码 def __is_static(self, flow: http.HTTPFlow) -> bool:static_ext [.j…

探讨Go语言中的HTTP代理模式:看Go如何玩转网络中转站

在互联网的海洋中&#xff0c;HTTP代理服务器像一座灯塔&#xff0c;为我们的网络冲浪提供了指引。而当Go语言遇上HTTP代理&#xff0c;会碰撞出怎样的火花呢&#xff1f;今天&#xff0c;让我们一起探讨Go语言中的HTTP代理模式&#xff0c;看看它如何玩转这个网络中转站&#…

三:C语言-输入与输出

三&#xff1a;输入与输出 一&#xff1a;输出 1.printf()&#xff1a; ​ 将参数文本输出到屏幕上&#xff0c;它名字里的 f 代表 format&#xff08;格式化&#xff09;&#xff0c;表示可以定制输出文本的格式 ​ printf()不会在行尾自动添加换行符&#xff0c;待运行结…

IDEA(十)2022版本 Services中服务窗口不显示端口号解决

目录 一、问题描述二、问题分析三、解决方案3.1 设置启动参数【生效】3.2 方法二&#xff1a;设置环境变量【不生效】3.3 方法三&#xff1a;删除缓存【不生效】 四、补充&#xff1a;如何手动控制端口显示 一、问题描述 我们在使用 IDEA 的过程中&#xff0c;会发现在 Servic…

Hive之set参数大全-11

设置 Map Join 操作中优化哈希表的工作集大小&#xff08;working set size&#xff09; hive.mapjoin.optimized.hashtable.wbsize 是 Apache Hive 中的一个配置属性&#xff0c;用于设置 Map Join 操作中优化哈希表的工作集大小&#xff08;working set size&#xff09;。 …

Dockerfile:如何写一个Dockerfile文件?

如何写一个Dockerfile文件&#xff1f; &#x1f6a8;推荐参考&#xff1a;Dockerfile&#xff1a;如何写一个Dockerfile文件&#xff1f; 现在的项目肯定都离不开docker&#xff0c;只要是流水线部署就会涉及Dockerfile文件&#xff0c;那么如何写一个正确的编写一个Dockerfil…

dpdk网络转发环境的搭建

文章目录 前言ip命令的使用配置dpdk-basicfwd需要的网络结构测试dpdk-basicfwddpdk-basicfwd代码分析附录basicfwd在tcp转发时的失败抓包信息DPDK的相关设置 前言 上手dpdk有两难。其一为环境搭建。被绑定之后的网卡没有IP&#xff0c;我如何给它发送数据呢&#xff1f;当然&a…

[leetcode] 18. 四数之和

文章目录 题目描述解题方法排序 双指针java代码 相似题目 题目描述 给你一个由 n 个整数组成的数组 nums &#xff0c;和一个目标值 target 。请你找出并返回满足下述全部条件且不重复的四元组 [nums[a], nums[b], nums[c], nums[d]] &#xff08;若两个四元组元素一一对应&a…