依赖
- jpype1=1.5.0
- jaydebeapi=1.2.3
- DmJdbcDriver18.jar
启动JVM
def start_jvm():jvm_path = jpype.getDefaultJVMPath()jar_path = ":".join([".....jar", # 需要python调用的其他jar包".../DmJdbcDriver18.jar" # 去达梦官网下载])Djava = "-Djava.class.path=" + jar_pathif not jpype.isJVMStarted():jpype.startJVM(jvm_path, Djava, convertStrings=True) # 需要convertStrings,否则返回变量是java类型的,如java.lang.Integer,没法用python处理if not jpype.isThreadAttachedToJVM():jpype.attachThreadToJVM()
需要注意,在一个python主程序中,一个进程中只能调用一次 startJVM,否则报错:OSError: JVM cannot be restarted
建立连接
def get_db_conn():host = password = username = port = database = jdbc_driver_path = ".../DmJdbcDriver18.jar"jdbc_class_name = "dm.jdbc.driver.DmDriver"url = f'jdbc:dm://{host}:{port}/{database}'conn = jaydebeapi.connect(jdbc_class_name, url, [username, password], jdbc_driver_path)return conn
这个连接是 DB-API 格式,遵循DB-API共同的使用形式:
cur = conn.cursor()
try:cur.execute(sql)conn.commit()
except Exception as e:conn.rollback()
finally:conn.close()
连接池
import jaydebeapi
import jpype
from threading import Semaphore
import loggingclass JConnectionPool:def __init__(self, jdbc_driver_path, jdbc_class_name, jdbc_url, username, password, max_connections=20):self.jdbc_driver_path = jdbc_driver_pathself.jdbc_class_name = jdbc_class_nameself.jdbc_url = jdbc_urlself.conn_properties = {"user": username,"password": password}self.max_connections = max_connectionsself.semaphore = Semaphore(max_connections) # 通过信号量的形式控制连接的数量,也支持Queue方式实现self.connections = []self._initialize_pool()def _initialize_pool(self):if not jpype.isJVMStarted():raise Exception("JVM not started. run `jpype.startJVM` first.")for _ in range(self.max_connections):try:conn = self._create_connection()self.connections.append(conn)except Exception as e:logging.error(f"Failed to initialize connection: {str(e)}")# 验证连接的合法性def _is_connection_valid(self, conn):try:cursor = conn.cursor()cursor.execute("SELECT 1")cursor.close()return Trueexcept Exception:return False# 重新建立连接def _create_connection(self):return jaydebeapi.connect(self.jdbc_class_name, self.jdbc_url, self.conn_properties)# 获取连接,可配置超时时间def get_connection(self, timeout=None):if not self.semaphore.acquire(timeout=timeout): # 同一时刻只允许self.max_connections个线程调用get_connectionraise Exception("Timeout: Failed to acquire connection from the pool.")try:for conn in self.connections:if self._is_connection_valid(conn):return connelse:new_conn = self._create_connection()self.connections.append(new_conn)return new_connexcept Exception as e:self.semaphore.release()raise Exception(f"Failed to get connection: {str(e)}")# 将连接放回连接池def close_connection(self, conn):try:self.connections.append(conn)except Exception as e:logging.error(f"Failed to close connection: {str(e)}")finally:self.semaphore.release()# 关闭连接池def close_pool(self):for conn in self.connections:try:conn.close()except Exception as e:logging.error(f"Failed to close connection: {str(e)}")self.connections = []