Python 助力 DBA:高效批量管理数据库服务器的多线程解决方案-多库查询汇总工具实现

批量数据库服务器连接测试与数据汇总:Python实现方案

作为数据库服务器运维人员,我们经常需要面对大量服务器的连接测试和数据汇总工作。本文将介绍一个使用Python实现的高效解决方案,可以帮助我们快速完成这些任务。

需求概述

  1. 从配置文件中读取要测试的数据库服务器IP地址列表。
  2. 批量测试数据库服务器的连接情况。
  3. 在所有可连接的服务器上执行相同的SQL查询。
  4. 将查询结果汇总到一个单独的数据库中,并包含对应服务器的IP地址。
  5. 自动创建结果表,表名按日期随机生成。
  6. 提供详细的日志输出,包括实时的处理进度。

实现方案

我们使用Python来实现这个方案,主要利用了以下库和技术:

  • pyodbc: 用于数据库连接和操作
  • configparser: 读取配置文件
  • concurrent.futures: 实现并发处理
  • logging: 日志记录
  • 多线程技术:提高处理效率

代码实现

以下是完整的Python代码实现:

import pyodbc
import logging
import configparser
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from datetime import datetime
import random
import string# 配置日志
def setup_logger():"""设置日志记录器,同时输出到文件和控制台"""logger = logging.getLogger()logger.setLevel(logging.INFO)# 文件处理器file_handler = logging.FileHandler('db_query_aggregation.log')file_handler.setLevel(logging.INFO)file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')file_handler.setFormatter(file_formatter)# 控制台处理器console_handler = logging.StreamHandler()console_handler.setLevel(logging.INFO)console_formatter = logging.Formatter('%(message)s')console_handler.setFormatter(console_formatter)logger.addHandler(file_handler)logger.addHandler(console_handler)return loggerlogger = setup_logger()def read_config(config_file):"""读取配置文件:param config_file: 配置文件路径:return: 包含配置信息的字典"""try:config = configparser.ConfigParser()config.read(config_file)return {'ip_list_file': config['Files']['ip_list_file'],'source_db_username': config['SourceDB']['username'],'source_db_password': config['SourceDB']['password'],'target_db_info': dict(config['TargetDB']),'max_workers': int(config['Settings']['max_workers']),'query': config['Query']['sql']}except Exception as e:logger.error(f"读取配置文件时出错: {e}")raisedef read_ip_list(file_path):"""从文件中读取IP地址列表:param file_path: IP地址文件路径:return: IP地址列表"""try:with open(file_path, 'r') as file:return [line.strip() for line in file if line.strip()]except IOError as e:logger.error(f"无法读取IP地址文件: {e}")return []def create_connection(server, database, username, password):"""创建数据库连接:param server: 服务器地址:param database: 数据库名称:param username: 用户名:param password: 密码:return: 数据库连接对象,如果连接失败则返回None"""try:conn_str = f'DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={database};UID={username};PWD={password}'return pyodbc.connect(conn_str, timeout=5)except pyodbc.Error as e:logger.error(f"连接到服务器 {server} 失败: {e}")return Nonedef execute_query(connection, query):"""执行SQL查询:param connection: 数据库连接对象:param query: SQL查询语句:return: 查询结果列表"""try:cursor = connection.cursor()cursor.execute(query)return cursor.fetchall()except pyodbc.Error as e:logger.error(f"执行查询时出错: {e}")return []def process_server(ip, username, password, query):"""处理单个服务器的查询:param ip: 服务器IP地址:param username: 数据库用户名:param password: 数据库密码:param query: SQL查询语句:return: 元组 (IP地址, 查询结果)"""start_time = time.time()logger.info(f"开始处理服务器 {ip}")try:conn = create_connection(ip, 'master', username, password)if conn:results = execute_query(conn, query)conn.close()end_time = time.time()processing_time = end_time - start_timelogger.info(f"服务器 {ip} 处理完成. 获取 {len(results)} 行数据. 耗时 {processing_time:.2f} 秒")return ip, resultsexcept Exception as e:logger.error(f"处理服务器 {ip} 时发生错误: {e}")logger.info(f"服务器 {ip} 处理失败")return ip, []def create_target_table(connection, table_name, columns):"""在目标数据库中创建表:param connection: 目标数据库连接对象:param table_name: 要创建的表名:param columns: 列定义列表"""try:cursor = connection.cursor()create_table_query = f"CREATE TABLE {table_name} (ServerIP VARCHAR(15), {', '.join(columns)})"cursor.execute(create_table_query)connection.commit()logger.info(f"成功创建表 {table_name}")except pyodbc.Error as e:logger.error(f"创建目标表时出错: {e}")raisedef insert_data(connection, table_name, data):"""将数据插入目标数据库:param connection: 目标数据库连接对象:param table_name: 目标表名:param data: 要插入的数据列表:return: 插入的行数"""try:cursor = connection.cursor()placeholders = ', '.join(['?' for _ in range(len(data[0]))])insert_query = f"INSERT INTO {table_name} VALUES ({placeholders})"cursor.fast_executemany = Truecursor.executemany(insert_query, data)connection.commit()return cursor.rowcountexcept pyodbc.Error as e:logger.error(f"插入数据时出错: {e}")connection.rollback()return 0def generate_table_name():"""生成随机表名:return: 生成的表名"""date_str = datetime.now().strftime("%Y%m%d")random_str = ''.join(random.choices(string.ascii_lowercase, k=5))return f"QueryResults_{date_str}_{random_str}"def main():"""主函数,协调整个数据查询和汇总过程"""try:# 读取配置config = read_config('config.ini')ip_list = read_ip_list(config['ip_list_file'])if not ip_list:logger.error("IP地址列表为空,程序终止")returnlogger.info(f"开始处理 {len(ip_list)} 个服务器")# 并发查询所有服务器results = []with ThreadPoolExecutor(max_workers=config['max_workers']) as executor:future_to_ip = {executor.submit(process_server, ip, config['source_db_username'], config['source_db_password'], config['query']): ip for ip in ip_list}for future in as_completed(future_to_ip):ip, result = future.result()if result:results.extend([(ip,) + tuple(row) for row in result])if not results:logger.info("没有查询到数据,程序终止")return# 连接目标数据库target_conn = create_connection(**config['target_db_info'])if not target_conn:logger.error("无法连接到目标数据库,程序终止")return# 创建目标表并插入数据table_name = generate_table_name()columns = [f"Column{i} VARCHAR(100)" for i in range(len(results[0]) - 1)]create_target_table(target_conn, table_name, columns)rows_inserted = insert_data(target_conn, table_name, results)target_conn.close()logger.info(f"数据汇总完成。插入 {rows_inserted} 行到表 {table_name}")print(f"查询结果已插入表: {table_name}")except Exception as e:logger.critical(f"程序执行过程中发生严重错误: {e}")print(f"程序执行过程中发生错误,请查看日志文件获取详细信息。")if __name__ == "__main__":main()

代码说明

  1. 配置文件读取:使用 configparser 模块读取配置文件,包括数据库连接信息、查询语句等。

  2. 多线程处理:使用 ThreadPoolExecutor 并发执行查询,提高效率。

  3. 异常处理:每个关键操作都包含了异常处理,确保程序的稳定性。

  4. 模块化设计:将不同功能分解为独立的函数,提高代码的可读性和可维护性。

  5. 日志记录:使用 logging 模块记录详细的操作日志,同时输出到文件和控制台。

  6. 动态表创建:在目标数据库中动态创建表,表名包含日期和随机字符串。

  7. 数据汇总:将所有服务器的查询结果汇总到一个列表中,包括服务器IP地址。

  8. 批量数据插入:使用 executemany 批量插入数据到目标表。

使用说明

  1. 创建 config.ini 配置文件,包含以下内容:
[Files]
ip_list_file = server_ip_list.txt[SourceDB]
username = your_source_username
password = your_source_password[TargetDB]
server = your_target_server
database = your_target_database
username = your_target_username
password = your_target_password[Settings]
max_workers = 50[Query]
sql = SELECT column1, column2 FROM your_table
  1. 准备一个包含要查询的服务器IP地址的文本文件(如 server_ip_list.txt)。

  2. 运行脚本,它将并发查询所有服务器,汇总结果(包括服务器IP),并插入到目标数据库的新表中。

  3. 脚本执行完成后,会输出生成的表名。

结论

这个Python脚本提供了一个高效、灵活的解决方案,可以批量测试数据库服务器连接、执行查询并汇总结果。它具有以下优点:

  • 并发处理,大幅提高效率
  • 详细的日志记录,便于监控和调试
  • 灵活的配置,易于适应不同环境
  • 异常处理完善,提高程序稳定性
  • 结果包含服务器IP,便于追踪数据来源

对于需要管理大量数据库服务器的运维人员来说,这个脚本可以显著提高工作效率。您可以根据实际需求进一步调整和优化这个脚本,例如添加更多的错误处理、优化查询性能,或者扩展功能以支持更复杂的操作。

通过使用这个脚本,您可以轻松地对多个数据库服务器进行批量操作,并将结果汇总到一个中心位置,大大简化了数据库管理和监控的工作流程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/64706.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue预览和下载 pdf、ppt、word、excel文档,文件类型为链接或者base64格式或者文件流,

** 方法1&#xff1a;word、xls、ppt、pdf 这些文件&#xff0c; 如果预览的文件是链接可以直接打开&#xff0c;可用微软官方的预览地址 ** <iframe width"100%" :src"textVisibleURl " id"myFramePPT" style"border: none;backgroun…

python elasticsearch_dsl PIT Point in time API 查询

默认情况下&#xff0c;搜索请求针对目标索引的最新可见数据&#xff08;称为时间点&#xff09;执行。elasticsearchpit&#xff08;时间点&#xff09;是一种轻量级视图&#xff0c;可以查看数据在启动时的状态。在某些情况下&#xff0c;最好使用同一时间点执行多个搜索请求…

OB删除1.5亿数据耗费2小时

目录 回顾&#xff1a;mysql是怎么删除数据的&#xff1f; 删除方案 代码实现 执行结果 结论 本篇是实际操作 批量处理数据以及线程池线程数设置 记录学习 背景&#xff1a;有一张用户标签表&#xff0c;存储数据量达4个亿&#xff0c;使用OceanBase存储&#xff0c;由于…

【2025最新计算机毕业设计】基于SSM框架的宠物领养系统【提供源码+答辩PPT+文档+项目部署】

作者简介&#xff1a;✌CSDN新星计划导师、Java领域优质创作者、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和学生毕业项目实战,高校老师/讲师/同行前辈交流。✌ 主要内容&#xff1a;&#x1f31f;Java项目、Python项目、前端项目、PHP、ASP.NET、人工智能…

【专题】2024抖音电商母婴行业分析报告汇总PDF洞察(附原数据表)

原文链接&#xff1a;https://tecdat.cn/?p38651 在数字化浪潮的席卷下&#xff0c;抖音电商母婴行业正经历着深刻变革。当下&#xff0c;年轻一代父母崛起&#xff0c;特别是 24 至 30 岁以及 18 至 23 岁的群体成为抖音母婴行业兴趣人群的主力军。他们带来全新育儿理念&…

免费开源!推荐一款网页版数据库管理工具!

免费开源&#xff01;推荐一款网页版数据库管理工具&#xff01; DBGate 是一个开源的数据库管理工具&#xff0c;DBGate 的最大特点是可以 Web 访问&#xff01;&#xff0c;轻松实现一台机器部署&#xff0c;所有人使用&#xff01; 无论是 MySQL、PostgreSQL、SQLite 还是…

使用 UniApp 在微信小程序中实现 SSE 流式响应

概述 服务端发送事件(Server-Sent Events, SSE)是一种允许服务器向客户端推送实时更新的技术。SSE 提供了一种单向的通信通道,服务器可以持续地向客户端发送数据,而不需要客户端频繁发起请求。这对于需要实时更新的应用场景非常有用。 流式传输的特点是将数据逐步传输给客…

【Java项目】基于SpringBoot的【旅游管理系统 】

【Java项目】基于SpringBoot的【旅游管理系统 】 技术简介&#xff1a;本系统使用JAVA语言开发&#xff0c;采用B/S架构、Spring Boot框架、MYSQL数据库进行开发设计。 系统简介&#xff1a;&#xff08;1&#xff09;管理员功能&#xff1a;可以管理个人中心、用户管理、景区分…

音频接口:PDM TDM128 TDM256

一、 PDM接口 在麦克风&#xff08;Mic&#xff09;接口中&#xff0c;PDM&#xff08;Pulse Density Modulation&#xff0c;脉冲密度调制&#xff09;和I2S&#xff08;Inter-IC Sound&#xff0c;集成电路内置音频总线&#xff09;是两种常见的数字输出接口。 1、工作原理…

【系统】Windows11更新解决办法,一键暂停

最近的windows更新整的我是措不及防&#xff0c;干啥都要关注一下更新的问题&#xff0c;有的时候还关不掉&#xff0c;我的强迫症就来了&#xff0c;非得关了你不可&#xff01; 经过了九九八十一难的研究之后&#xff0c;终于找到了一个算是比较靠谱的暂停更新的方法&#x…

Canoe E2E校验自定义Checksum算法

文章目录 一、添加 DBC文件二、导入要仿真的ECU节点三、编写 CAPL脚本1. 创建 .can 文件2. 设置counter递增3. 设置 CRC 算法&#xff0c;以profile01 8-bit SAE J1850 CRC校验为例 四、开始仿真五、运行结果CRC在线校验 当E2E的 CRC算法非常规算法&#xff0c;则需要自己编写代…

穷举vs暴搜vs深搜vs回溯vs剪枝系列一>找出所有子集的异或总和再求和

题目&#xff1a; 解析&#xff1a; 代码&#xff1a; private int ret;//返回周结果private int path;//枚举一个元素就异或进去public int subsetXORSum(int[] nums) {dfs(nums, 0);return ret;} private void dfs(int[] nums, int pos){ret path;for(int i pos; i <…

洛谷【贪心算法】P1803 学习笔记

2024-12-20 - 第 41 篇 洛谷贪心算法题单 - 贪心算法 - 学习笔记 作者(Author): 郑龙浩 / 仟濹(CSND账号名) P1803 凌乱的yyy / 线段覆盖 题目背景 快 noip 了&#xff0c;yyy 很紧张&#xff01; 题目描述 现在各大 oj 上有 n n n 个比赛&#xff0c;每个比赛的开始、结…

python使用Flask框架创建一个简单的动态日历

0. 运行效果 运行代码&#xff0c;然后在浏览器中访问 http://127.0.0.1:5000/&#xff0c;将看到一个动态日历&#xff0c;能够通过点击按钮切换月份。 1. 安装 Flask 首先&#xff0c;确保你已经安装了Flask。如果没有&#xff0c;可以使用以下命令安装&#xff1a; pip i…

Python字典使用练习-----实现查找电话号

不记得字典语法知识的可以翻我主页^V^ 【要求】 定义一个电话簿&#xff0c;字典里头设置以下联系人&#xff1a; mayun:13309283335, zhaolong:18989227822, zhangmin:13382398921, Gorge:19833824743, Jordan:18807317878, Curry:15093488129, Wade:19282937665 现在输入…

windows openssl编译x64版libssl.lib,编译x64版本libcurl.lib,支持https,vs2015编译器

不要纠结&#xff0c;直接选择用perl编译&#xff01; 告诫想要用弄成vs编译版的&#xff0c;暂时先别给自己增加麻烦 告诫&#xff0c;以下执行的每一步&#xff0c;都不要纠结 先安装环境 nasm 64位版本 https://www.nasm.us/pub/nasm/releasebuilds/2.16.01/win64/nasm-…

dev类似于excel的数据编辑

其实这个不是我最后的结果&#xff0c;只是中间demo&#xff0c;因为我的场景数据量很大&#xff0c;2w左右&#xff0c;有数据合并&#xff0c;我更倾向于el-table是实现&#xff0c;但不想el-input一直显示&#xff0c;想用if-else 去做隐藏&#xff0c;但是用typetextarea发…

uniapp对接unipush 1.0 ios/android

简介 实现方法 是uniapp官网推荐的 unipush-v1 文档配置具体看 uni-app官网 配置好了之后 代码实现 前端代码 前端的主要任务是监听 监听到title content 创建消息推送 安卓 可以收到在线消息并且自动弹出消息 IOS 可以监听到在线消息但是需要手动推送 以下代码app初始…

【WRF-Urban】输入空间分布人为热排放数据的WRF运行全过程总结

目录 数据准备检查新增变量配置(如果有)WPS预处理修改namelist.wpsStep1: geogridStep2: ungribStep3: metgridWRF运行修改namelist.input调试namelist.input运行./real.exe运行./wrf.exe参考WRF模型的基本流程如下: 数据准备 空间分布热排放数据下载及制备可参见另一博客…

JavaSE——绘图入门

一、Java绘图坐标体系 下图说明了Java坐标系&#xff0c;坐标原地位于左上角&#xff0c;以像素为单位。在Java坐标系中&#xff0c;第一个是x坐标&#xff0c;表示当前位置为水平方向&#xff0c;距离坐标原点x个像素&#xff1b;第二个是y坐标&#xff0c;表示当前位置为垂直…