- 监测数据采集物联网应用开发步骤(5.1)
包含4个类数据库连接(com.zxy.db_Self.ConnectionPool_Self.py)、数据库操作类(com.zxy.db_Self.Db_Common_Self.py)、数据库管理类(com.zxy.db_Self.DBManager_Self.py)、数据库连接池类(com.zxy.db_Self.PooledConnection_Self.py)
据库连接(com.zxy.db_Self.ConnectionPool_Self.py)
#! python3
# -*- coding: utf-8 -
'''
Created on 2023年08月28日
@author: zxyong 13738196011
'''import sqlite3,time,mysql.connector,threading
from com.zxy.z_debug import z_debug
from com.zxy.db_Self.PooledConnection_Self import PooledConnection_Self#监测数据采集物联网应用--数据库连接
class ConnectionPool_Self(z_debug):attJdbcDriver = ""attDbUrl = ""attDbUsername = ""attDbPassword = ""attInitialConnections = 5attIncrementalConnections = 2attMaxConnections = 10attPooledConnection_Selfs = []def __init__(self, inputJdbcDriver, inputDbUrl, inputDbUsername, inputDbPassword): self.attJdbcDriver = inputJdbcDriverif inputJdbcDriver == "org.sqlite.JDBC":self.attInitialConnections = 2self.attMaxConnections = 5self.attDbUrl = inputDbUrlself.attDbUsername = inputDbUsernameself.attDbPassword = inputDbPasswordtry:self.createPool()except Exception as e:if str(type(self)) == "<class 'type'>":self.debug_in(self,repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息else:self.debug_in(repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息finally:passdef createPool(self):if len(self.attPooledConnection_Selfs) == 0:lock = threading.Lock()if lock.acquire():self.createConnections(self.attInitialConnections) if str(type(self)) == "<class 'type'>":self.debug_in(self,"myself db create pool")#打印异常信息else:self.debug_in("myself db create pool")#打印异常信息lock.release()def createConnections(self, inputNumConnections):if self.attMaxConnections > 0 and len(self.attPooledConnection_Selfs) >= self.attMaxConnections: if str(type(self)) == "<class 'type'>":self.debug_in(self,"myself db connections is max")#打印异常信息else:self.debug_in("myself db connections is max")#打印异常信息self.findFreeConnection()for iIndex in range(1,inputNumConnections):try:temCon = self.newConnection()temPolCon = PooledConnection_Self(temCon)self.attPooledConnection_Selfs.append(temPolCon) except Exception as e:if str(type(self)) == "<class 'type'>":self.debug_in(self,repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息else:self.debug_in(repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息def newConnection(self): if self.attJdbcDriver == "org.sqlite.JDBC":temConn = sqlite3.connect(self.attDbUrl,check_same_thread = False)return temConnelif self.attJdbcDriver == "com.mysql.jdbc.Driver":try:temConn = mysql.connector.Connect(host=self.attDbUrl.split(":")[0],user=self.attDbUsername,db=self.attDbUrl.split(":")[2],passwd=self.attDbPassword,port=self.attDbUrl.split(":")[1])return temConnexcept Exception as e:if str(type(self)) == "<class 'type'>":self.debug_in(self,repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息else:self.debug_in(repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息else:return Nonedef getConnection(self):temReturnResult = Nonelock = threading.Lock()if lock.acquire():if len(self.attPooledConnection_Selfs) == 0:return Noneelse:temReturnResult = self.getFreeConnection() while self.attPooledConnection_Selfs is None:time.sleep(0.2)temReturnResult = self.getFreeConnection()lock.release()return temReturnResultdef getFreeConnection(self):temConn_self = self.findFreeConnection()if temConn_self is None:self.createConnections(self.attIncrementalConnections)temConn_self = self.findFreeConnection()if temConn_self is None:return Nonereturn temConn_selfdef findFreeConnection(self):temPc = Nonewhile temPc is None:for i in range(len(self.attPooledConnection_Selfs)):temPc = self.attPooledConnection_Selfs[i]if temPc.attBusy == False or temPc.attConnection is None:temPc.attBusy = Truetry:if temPc.attConnection is None :temPc.attConnection = self.newConnection() except Exception as e:del self.attPooledConnection_Selfs[i]i = i - 1if str(type(self)) == "<class 'type'>":self.debug_in(self,repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息else:self.debug_in(repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息continuebreakif temPc.attConnection is not None:breakelse:time.sleep(0.5)return temPcdef closeConnection(self,inputConn):try:if str(type(self)) == "<class 'type'>":self.debug_in(self,"the myself close db")#打印异常信息else:self.debug_in("the myself close db")#打印异常信息inputConn.close()except Exception as e:if str(type(self)) == "<class 'type'>":self.debug_in(self,repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息else:self.debug_in(repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息def returnConnection(self,inputConn):if len(self.attPooledConnection_Selfs) == 0:if str(type(self)) == "<class 'type'>":self.debug_in(self,"myself db returnConnection!")#打印异常信息else:self.debug_in("myself db returnConnection!")#打印异常信息return Nonefor i in range(len(self.attPooledConnection_Selfs)):temPConn = self.attPooledConnection_Selfs[i] if temPConn.attConnection == inputConn and temPConn.attBusy:temPConn.attBusy = FalseBreak
数据库操作类(com.zxy.db_Self.Db_Common_Self.py)
#! python3
# -*- coding: utf-8 -
'''
Created on 2023年08月28日
@author: zxyong 13738196011
'''from com.zxy.adminlog.UsAdmin_Log import UsAdmin_Log
from com.zxy.common import Com_Para
from com.zxy.db_Self.DBManager_Self import DBManager_Self
from com.zxy.z_debug import z_debug#监测数据采集物联网应用--数据库操作
class Db_Common_Self(z_debug): attSqlException = ""attRs_out = NoneattConn_a = NoneattColumnNames = []def __init__(self):passdef Common_SqlNoCommit(self, inputStrSql):temRs = Nonetry:temDs = DBManager_Self()self.attConn_a = temDs.getConnection()if str(self.attConn_a.attConnection).find("sqlite3.Connection") != -1:Com_Para.Dblock1.acquire()temRs = self.attConn_a.executeQueryNoCommit(inputStrSql) except Exception as e: temLog = ""if str(type(self)) == "<class 'type'>":temLog = self.debug_info(self)+inputStrSql+"==>"+repr(e)self.debug_in(self,inputStrSql+"==>"+repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息else:temLog = self.debug_info()+inputStrSql+"==>"+repr(e)self.debug_in(inputStrSql+"==>"+repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息uL = UsAdmin_Log(Com_Para.ApplicationPath, temLog)uL.WriteLog()finally:temDs.returnConnection(self.attConn_a.attConnection)self.Close_Conn()if str(self.attConn_a.attConnection).find("sqlite3.Connection") != -1:Com_Para.Dblock1.release()return temRsdef Common_Sql(self, inputStrSql):temRs = Nonetry:temDs = DBManager_Self()self.attConn_a = temDs.getConnection()temRs = self.attConn_a.executeQuery(inputStrSql) self.attColumnNames = self.attConn_a.attColumnNames except Exception as e: temLog = ""if str(type(self)) == "<class 'type'>":temLog = self.debug_info(self)+inputStrSql+"==>"+repr(e)self.debug_in(self,inputStrSql+"==>"+repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息else:temLog = self.debug_info()+inputStrSql+"==>"+repr(e)self.debug_in(inputStrSql+"==>"+repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息uL = UsAdmin_Log(Com_Para.ApplicationPath, temLog)uL.WriteLog()finally:temDs.returnConnection(self.attConn_a.attConnection)self.Close_Conn()return temRsdef CommonExec_SqlRowID(self, inputStrSql):temIResult = -1try:temDs = DBManager_Self()self.attConn_a = temDs.getConnection()if str(self.attConn_a.attConnection).find("sqlite3.Connection") != -1:Com_Para.Dblock1.acquire()temIResult = self.attConn_a.executeUpdateRowID(inputStrSql) except Exception as e: temLog = ""if str(type(self)) == "<class 'type'>":temLog = self.debug_info(self)+inputStrSql+"==>"+repr(e)self.debug_in(self,inputStrSql+"==>"+repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息else:temLog = self.debug_info()+inputStrSql+"==>"+repr(e)self.debug_in(inputStrSql+"==>"+repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息uL = UsAdmin_Log(Com_Para.ApplicationPath, temLog)uL.WriteLog()temIResult = -1finally:temDs.returnConnection(self.attConn_a.attConnection)self.Close_Conn()if str(self.attConn_a.attConnection).find("sqlite3.Connection") != -1:Com_Para.Dblock1.release()return temIResultdef CommonExec_Sql(self, inputStrSql):temIResult = -1try:temDs = DBManager_Self()self.attConn_a = temDs.getConnection()if str(self.attConn_a.attConnection).find("sqlite3.Connection") != -1:Com_Para.Dblock1.acquire()temIResult = self.attConn_a.executeUpdate(inputStrSql) except Exception as e:temLog = ""if str(type(self)) == "<class 'type'>":temLog = self.debug_info(self)+inputStrSql+"==>"+repr(e)self.debug_in(self,inputStrSql+"==>"+repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息else:temLog = self.debug_info()+inputStrSql+"==>"+repr(e)self.debug_in(inputStrSql+"==>"+repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息uL = UsAdmin_Log(Com_Para.ApplicationPath, temLog)uL.WriteLog()temIResult = -1finally:temDs.returnConnection(self.attConn_a.attConnection)self.Close_Conn()if str(self.attConn_a.attConnection).find("sqlite3.Connection") != -1:Com_Para.Dblock1.release()return temIResult # ProName 存储过程名 Parameters输入参数 ParamTypes参数类型String Int float Date
# ParamOutName输出参数名 ParamOutType输出参数类型def Common_Sql_Proc(self,inputProName, inputParameters, inputParamTypes, inputParamOutName, inputParamOutType, inputTrn):try: temDs = DBManager_Self()self.attConn_a = temDs.getConnection() if str(self.attConn_a.attConnection).find("sqlite3.Connection") != -1:Com_Para.Dblock1.acquire()if inputTrn.attINF_TYPE == "1":self.attRs_out = self.attConn_a.ParamExecuteQuery(inputProName,inputParameters,inputParamTypes,inputParamOutName,inputParamOutType)else:self.attRs_out = self.attConn_a.ParamExecuteQuery(inputProName,inputParameters,inputParamTypes,inputParamOutName,inputParamOutType,inputTrn.attINF_EN_SQL)self.attColumnNames = self.attConn_a.attColumnNamesexcept Exception as e:self.attSqlException = "数据库操作出错请查看程序错误日志文件:" + inputProName + " "+ repr(e) temLog = ""if str(type(self)) == "<class 'type'>":temLog = self.debug_info(self)+inputProName+"==>"+repr(e)self.debug_in(self,inputProName+"==>"+repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息else:temLog = self.debug_info()+inputProName+"==>"+repr(e)self.debug_in(inputProName+"==>"+repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息uL = UsAdmin_Log(Com_Para.ApplicationPath, temLog)uL.WriteLog()finally:temDs.returnConnection(self.attConn_a.attConnection)self.Close_Conn()if str(self.attConn_a.attConnection).find("sqlite3.Connection") != -1:Com_Para.Dblock1.release()return self.attRs_outdef Close_Conn(self):if not self.attConn_a.attConnection is None:pass
数据库管理类(com.zxy.db_Self.DBManager_Self.py)
#! python3
# -*- coding: utf-8 -
'''
Created on 2023年08月28日
@author: zxyong 13738196011
'''from com.zxy.common import Com_Para
from com.zxy.common.DbConfigSelf import DbConfigSelf
from com.zxy.db_Self.ConnectionPool_Self import ConnectionPool_Self
from com.zxy.z_debug import z_debug#监测数据采集物联网应用--数据库管理
class DBManager_Self(z_debug):attConn = NoneattConnectionPool = Nonedef __init__(self):if Com_Para.url == "":DbConfigSelf.GetDbConfigSelfNew()self.attConnectionPool = ConnectionPool_Self(Com_Para.driverClassName,Com_Para.url,Com_Para.username,Com_Para.password)try:self.attConnectionPool.createPool()except Exception as e:if str(type(self)) == "<class 'type'>":self.debug_in(self,repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息else:self.debug_in(repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息finally:passdef getConnection(self):try:self.attConn = self.attConnectionPool.getConnection()except Exception as e:if str(type(self)) == "<class 'type'>":self.debug_in(self,repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息else:self.debug_in(repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息finally:return self.attConndef returnConnection(self,inputConn):return self.attConnectionPool.returnConnection(inputConn)@staticmethoddef closeConnectionPoolTimeOut(self):try:self.attConnectionPool.closeConnectionPoolTimeOut()except Exception as e:if str(type(self)) == "<class 'type'>":self.debug_in(self,repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息else:self.debug_in(repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息finally:Pass
数据库连接池类(com.zxy.db_Self.PooledConnection_Self.py)
#! python3
# -*- coding: utf-8 -
'''
Created on 2023年08月28日
@author: zxyong 13738196011
'''from urllib.parse import unquotefrom com.zxy.adminlog.UsAdmin_Log import UsAdmin_Log
from com.zxy.common import Com_Para
from com.zxy.common.Com_Fun import Com_Fun
from com.zxy.z_debug import z_debug#监测数据采集物联网应用--数据库连接池
class PooledConnection_Self(z_debug):attUpdtime = 0attB_nocursor = TrueattConnection = NoneattBusy = FalseattColumnNames = []attlastrowid = -1def __init__(self, inputConn):self.attB_nocursor = Trueself.attConnection = inputConnself.attUpdtime = Com_Fun.getTimeLong()def executeQueryNoCommit(self, inputSql):temCursor = NonetemValues = Nonetry:self.attUpdtime = Com_Fun.getTimeLong() # 建立cursortemCursor = self.attConnection.cursor()# 执行sql selecttemCursor.execute(inputSql)# 利用featchall获取数据temValues = temCursor.fetchall()self.attColumnNames = temCursor.descriptionself.attConnection.commit()except Exception as e:temLog = ""if str(type(self)) == "<class 'type'>":temLog = self.debug_info(self)+inputSql+"==>"+repr(e)self.debug_in(self,inputSql+"==>"+repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息else:temLog = self.debug_info()+inputSql+"==>"+repr(e)self.debug_in(inputSql+"==>"+repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息uL = UsAdmin_Log(Com_Para.ApplicationPath, temLog)uL.WriteLog()finally:if not temCursor is None:temCursor.close()return temValuesdef executeQuery(self, inputSql):temCursor = NonetemValues = Nonetry:self.attUpdtime = Com_Fun.getTimeLong() # 建立cursortemCursor = self.attConnection.cursor()# 执行sql selecttemCursor.execute(inputSql)# 利用featchall获取数据temValues = temCursor.fetchall()self.attColumnNames = temCursor.descriptionif inputSql.lower().find("insert into") == 0 or inputSql.lower().find("update ") == 0 or inputSql.lower().find("delete ") == 0:self.attConnection.commit()except Exception as e:temLog = ""if str(type(self)) == "<class 'type'>":temLog = self.debug_info(self)+inputSql+"==>"+repr(e)self.debug_in(self,inputSql+"==>"+repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息else:temLog = self.debug_info()+inputSql+"==>"+repr(e)self.debug_in(inputSql+"==>"+repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息uL = UsAdmin_Log(Com_Para.ApplicationPath, temLog)uL.WriteLog()finally:if not temCursor is None:temCursor.close()return temValuesdef executeUpdateRowID(self, inputSql):temResult = -1temCursor = Nonetry:self.attUpdtime = Com_Fun.getTimeLong() # 建立cursortemCursor = self.attConnection.cursor()# 执行sql insert update delete ttemCursor.execute(inputSql)temResult = temCursor.lastrowidself.attConnection.commit()except Exception as e:temLog = ""if str(type(self)) == "<class 'type'>":temLog = self.debug_info(self)+inputSql+"==>"+repr(e)self.debug_in(self,inputSql+"==>"+repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息else:temLog = self.debug_info()+inputSql+"==>"+repr(e)self.debug_in(inputSql+"==>"+repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息uL = UsAdmin_Log(Com_Para.ApplicationPath, temLog)uL.WriteLog()finally:if not temCursor:temCursor.close()return temResultdef executeUpdate(self, inputSql):temResult = -1temCursor = Nonetry:self.attUpdtime = Com_Fun.getTimeLong() # 建立cursortemCursor = self.attConnection.cursor()# 执行sql insert update delete ttemCursor.execute(inputSql)temResult = temCursor.rowcountself.attConnection.commit()except Exception as e:temLog = ""if str(type(self)) == "<class 'type'>":temLog = self.debug_info(self)+inputSql+"==>"+repr(e)self.debug_in(self,inputSql+"==>"+repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息else:temLog = self.debug_info()+inputSql+"==>"+repr(e)self.debug_in(inputSql+"==>"+repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息uL = UsAdmin_Log(Com_Para.ApplicationPath, temLog)uL.WriteLog()finally:if not temCursor:temCursor.close()return temResultdef ParamExecuteQuery(self,inputProName, inputParameters, inputParamTypes, inputParamOutName, inputParamOutType, inputStrSql):self.attUpdtime = Com_Fun.getTimeLong()temValues = None# 建立cursortemCursor = self.attConnection.cursor()if len(inputParameters) == len(inputParamTypes) and len(inputParamOutName) == len(inputParamOutType):i = 0for temParamTypes in inputParamTypes:if temParamTypes == "LIST":j = 0temStr_V = ""for iIn in temParamTypes.split(","):if j != 0:temStr_V += ","temStr_V += "?"j += 1 inputStrSql = inputStrSql.replace("@\\?",temStr_V,1)if temParamTypes.upper() == "STRING":inputParameters[i] = inputParameters[i]#Com_Fun.py_urldecode(inputParameters[i])#unquote(inputParameters[i],Com_Para.U_CODE)passelif temParamTypes.upper() == "INT":inputParameters[i] = int(inputParameters[i])passelif temParamTypes.upper() == "FLOAT":inputParameters[i] = float(inputParameters[i])passelif temParamTypes.upper() == "DATE":inputParameters[i] = unquote(inputParameters[i].replace("+"," "),Com_Para.U_CODE)passelif temParamTypes.upper() == "LIST":passelif temParamTypes.upper() == "LIKESTRING":inputParameters[i] = unquote(inputParameters[i],Com_Para.U_CODE)passi += 1if inputStrSql.upper().strip().find("INSERT INTO") == 0 or inputStrSql.upper().strip().find("UPDATE") == 0:# 执行sql selectiCount = temCursor.execute(inputStrSql,inputParameters)self.attlastrowid = temCursor.lastrowidif iCount.rowcount != -1:# 执行sql insert update delete ttemCursor.execute("select '1' as 's_result','成功,"+str(iCount.rowcount)+"' as 'error_desc'")else:temCursor.execute("select '0' as 's_result','失败' as 'error_desc'")# 利用featchall获取数据temValues = temCursor.fetchall()self.attColumnNames = temCursor.descriptionself.attConnection.commit()elif inputStrSql.upper().strip().find("DELETE") == 0:if inputStrSql.upper().strip().find(";") != -1:iCount = Nonefor strSqls in inputStrSql.split(";"): # 执行多个sqliCount = temCursor.execute(strSqls)if iCount.rowcount != -1:# 执行sql insert update delete ttemCursor.execute("select '1' as 's_result','成功' as 'error_desc'")else:temCursor.execute("select '0' as 's_result','失败' as 'error_desc'")# 利用featchall获取数据temValues = temCursor.fetchall()self.attColumnNames = temCursor.descriptionself.attConnection.commit()else:# 执行sql selectiCount = temCursor.execute(inputStrSql,inputParameters)if iCount.rowcount != -1:# 执行sql insert update delete ttemCursor.execute("select '1' as 's_result','成功' as 'error_desc'")else:temCursor.execute("select '0' as 's_result','失败' as 'error_desc'")# 利用featchall获取数据temValues = temCursor.fetchall()self.attColumnNames = temCursor.descriptionself.attConnection.commit()elif inputStrSql.upper().strip().find("SELECT") == 0:iCount = temCursor.execute(inputStrSql,inputParameters)# 利用featchall获取数据temValues = temCursor.fetchall()self.attColumnNames = temCursor.descriptionself.attConnection.commit()return temValues
- 监测数据采集物联网应用开发步骤(5.3)