Python 操作数据库:读取 Clickhouse 数据存入csv文件

import pandas as pd
from clickhouse_driver import Client
import timeit
import logging
import threading
from threading import Lock
from queue import Queue
from typing import List, Dict, Set
from contextlib import contextmanager
import os
import time# 配置参数
CONFIG = {'DB': {'host': 'xxx','database': 'xxx','user': 'xxxx','password': 'xxxx'},'BATCH_SIZE': 5000,'TOTAL_RECORDS': 1000000,'NUM_THREADS': 5,'OUTPUT_FILE': 'yyxs_ck2excel_v4.csv','MAX_RETRIES': 3,           # 最大重试次数'RETRY_DELAY': 5,           # 重试延迟(秒)'CONNECTION_TIMEOUT': 60    # 连接超时时间(秒)
}# 设置日志记录
logging.basicConfig(level=logging.INFO,format='%(asctime)s.%(msecs)d - %(name)s - %(levelname)s - %(message)s',datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)class DatabaseManager:_thread_local = threading.local()@classmethod@contextmanagerdef get_connection(cls):"""线程安全的数据库连接管理器"""retry_count = 0while retry_count < CONFIG['MAX_RETRIES']:try:    if not hasattr(cls._thread_local, "client"):cls._thread_local.client = Client(**CONFIG['DB'],connect_timeout=CONFIG['CONNECTION_TIMEOUT'])logger.info(f"Created new database connection for thread {threading.current_thread().name}")yield cls._thread_local.clientbreakexcept Exception as e:retry_count += 1logger.error(f"Database connection error (attempt {retry_count}): {str(e)}")if hasattr(cls._thread_local, "client"):cls._thread_local.client.disconnect()delattr(cls._thread_local, "client")if retry_count < CONFIG['MAX_RETRIES']:time.sleep(CONFIG['RETRY_DELAY'])else:raise@classmethoddef close_all_connections(cls):"""关闭当前线程的数据库连接"""if hasattr(cls._thread_local, "client"):cls._thread_local.client.disconnect()delattr(cls._thread_local, "client")logger.info(f"Closed database connection for thread {threading.current_thread().name}")class DataProcessor:def __init__(self):self.columns = ["a", "b", "c", "d"]self.query = '''SELECTa,b,c,dFROMtable_nameORDER BYa,b,c,d '''self.file_lock = Lock()  # 添加文件写入锁self.total_rows = 0      # 添加行数统计self.processed_batches = set()  # 记录已成功处理的批次self.failed_batches = set()     # 记录失败的批次def fetch_data_batch(self, batch_size: int, start: int) -> List[tuple]:"""获取一批数据,带重试机制"""retry_count = 0while retry_count < CONFIG['MAX_RETRIES']:try:with DatabaseManager.get_connection() as client:query_with_limit = f"{self.query} LIMIT {batch_size} OFFSET {start}"result = client.execute(query_with_limit)logger.info(f"Fetched {len(result)} records starting from {start}.")return resultexcept Exception as e:retry_count += 1logger.error(f"Error fetching batch starting at {start} (attempt {retry_count}): {str(e)}")if retry_count < CONFIG['MAX_RETRIES']:time.sleep(CONFIG['RETRY_DELAY'])else:raisedef save_to_csv(self, df: pd.DataFrame, file_name: str, batch_start: int):"""保存数据到CSV文件"""try:with self.file_lock:  # 使用锁保护文件写入file_exists = os.path.exists(file_name) and os.path.getsize(file_name) > 0df.to_csv(file_name, mode='a', header= not file_exists,index=False)self.total_rows += len(df)self.processed_batches.add(batch_start)logger.info(f"Appended {len(df)} records to {file_name}. Total rows: {self.total_rows}")except Exception as e:logger.error(f"Error saving to CSV: {str(e)}")raisedef process_batch(self, start: int, batch_size: int, output_file: str):"""处理单个批次的数据"""try:if start in self.processed_batches:logger.info(f"Batch {start} already processed, skipping.")return Trueresult_batch = self.fetch_data_batch(batch_size, start)df_batch = pd.DataFrame(result_batch, columns=self.columns)self.save_to_csv(df_batch, output_file, start)return Trueexcept Exception as e:logger.error(f"Error processing batch starting at {start}: {str(e)}")self.failed_batches.add(start)return Falsedef main_v1():try:processor = DataProcessor()output_file = CONFIG['OUTPUT_FILE']# 清空或创建输出文件with open(output_file, 'w', encoding='utf-8') as f:passqueue = Queue()retry_queue = Queue()  # 用于重试失败的批次threads = []def worker():while True:try:start = queue.get()if start is None:breaksuccess = processor.process_batch(start, CONFIG['BATCH_SIZE'], output_file)if not success:retry_queue.put(start)queue.task_done()except Exception as e:logger.error(f"Worker thread error: {str(e)}")finally:queue.task_done()# 启动工作线程for _ in range(CONFIG['NUM_THREADS']):t = threading.Thread(target=worker)t.daemon = Truet.start()threads.append(t)# 添加任务到队列for start in range(0, CONFIG['TOTAL_RECORDS'], CONFIG['BATCH_SIZE']):queue.put(start)# 等待主要处理完成queue.join()# 处理失败的批次while not retry_queue.empty():start = retry_queue.get()logger.info(f"Retrying failed batch starting at {start}")if processor.process_batch(start, CONFIG['BATCH_SIZE'], output_file):logger.info(f"Successfully retried batch {start}")else:logger.error(f"Failed to process batch {start} after retries")# 停止所有线程for _ in threads:queue.put(None)for t in threads:t.join()# 最终验证logger.info(f"Processing completed. Total rows: {processor.total_rows}")logger.info(f"Processed batches: {len(processor.processed_batches)}")logger.info(f"Failed batches: {len(processor.failed_batches)}")if processor.failed_batches:logger.warning(f"Failed batches: {sorted(processor.failed_batches)}")except Exception as e:logger.error(f"Main process error: {str(e)}")raisefinally:DatabaseManager.close_all_connections()def main():try:processor = DataProcessor()output_file = CONFIG['OUTPUT_FILE']# 清空或创建输出文件with open(output_file, 'w', encoding='utf-8') as f:passqueue = Queue()retry_queue = Queue()threads = []def worker():while True:try:start = queue.get()if start is None:  # 退出信号queue.task_done()breaktry:success = processor.process_batch(start, CONFIG['BATCH_SIZE'], output_file)if not success:retry_queue.put(start)except Exception as e:logger.error(f"Error processing batch at offset {start}: {str(e)}")retry_queue.put(start)finally:queue.task_done()  # 只在这里调用一次except Exception as e:logger.error(f"Worker thread error: {str(e)}")# 不要在这里调用 queue.task_done()# 启动工作线程for _ in range(CONFIG['NUM_THREADS']):t = threading.Thread(target=worker)t.daemon = Truet.start()threads.append(t)# 添加任务到队列total_batches = (CONFIG['TOTAL_RECORDS'] + CONFIG['BATCH_SIZE'] - 1) // CONFIG['BATCH_SIZE']for start in range(0, CONFIG['TOTAL_RECORDS'], CONFIG['BATCH_SIZE']):queue.put(start)# 等待主要处理完成queue.join()# 处理失败的批次retry_count = 0max_retries = 3while not retry_queue.empty() and retry_count < max_retries:retry_count += 1retry_size = retry_queue.qsize()logger.info(f"Retrying {retry_size} failed batches (attempt {retry_count})")# 将失败的批次重新放入主队列for _ in range(retry_size):start = retry_queue.get()queue.put(start)# 等待重试完成queue.join()# 停止所有线程for _ in threads:queue.put(None)for t in threads:t.join()# 最终验证logger.info(f"Processing completed. Total rows: {processor.total_rows}")logger.info(f"Expected batches: {total_batches}")logger.info(f"Processed batches: {len(processor.processed_batches)}")logger.info(f"Failed batches: {len(processor.failed_batches)}")if processor.failed_batches:logger.warning(f"Failed batches: {sorted(processor.failed_batches)}")# 验证数据完整性try:df_final = pd.read_csv(output_file)actual_rows = len(df_final)logger.info(f"Final CSV file contains {actual_rows} rows")if actual_rows != processor.total_rows:logger.warning(f"Row count mismatch: CSV has {actual_rows} rows, but processed {processor.total_rows} rows")# 检查是否有重复的表头duplicate_headers = df_final[df_final.iloc[:, 0] == df_final.columns[0]]if not duplicate_headers.empty:logger.warning(f"Found {len(duplicate_headers)} duplicate headers at rows: {duplicate_headers.index.tolist()}")# 清理重复表头df_final = df_final[df_final.iloc[:, 0] != df_final.columns[0]]df_final.to_csv(output_file, index=False)logger.info(f"Cleaned CSV file now contains {len(df_final)} rows")except Exception as e:logger.error(f"Error validating final CSV file: {str(e)}")except Exception as e:logger.error(f"Main process error: {str(e)}")raisefinally:DatabaseManager.close_all_connections()if __name__ == "__main__":start_time = timeit.default_timer()try:main()elapsed_time = timeit.default_timer() - start_timelogger.info(f"数据提取和存储完成,耗时: {elapsed_time:.2f} 秒")except Exception as e:logger.error(f"程序执行失败: {str(e)}")raise
主要类
  • DatabaseManager

管理数据库连接的线程安全类

使用 threading.local() 确保每个线程有自己的连接

包含重试机制和连接管理功能

  • DataProcessor

处理数据的核心类

定义了数据列和查询语句

处理数据批次的获取和保存

跟踪处理状态和失败的批次

2. 工作流程

  • 初始化

创建空的输出文件

初始化线程池和任务队列

  • 数据处理

将总数据量分成多个批次

多个工作线程并行处理数据批次

每个批次:

  • 从数据库获取数据
  • 转换为 DataFrame
  • 保存到 CSV 文件
  • 错误处理

失败的批次会进入重试队列

最多重试 3 次

记录所有失败的批次

  • 数据验证

检查最终 CSV 文件的行数

检查和清理重复的表头

验证数据完整性

3. 特点

  • 线程安全

使用线程本地存储管理数据库连接

文件写入使用锁保护

  • 容错机制

数据库连接重试

批次处理重试

详细的日志记录

  • 性能优化

批量处理数据

多线程并行处理

使用队列管理任务

  • 监控和日志

详细的日志记录

处理进度跟踪

执行时间统计

这个程序适合处理大量数据的导出任务,具有良好的容错性和可靠性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/59162.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SCP收容物211~215

注 &#xff1a;此文接SCP收容物201~210,本文只供开玩笑 ,与steve_gqq_MC合作 --------------------------------------------------------------------------------------------------------------------------------- 目录 scp-211 scp-212 scp-213 scp-214 scp-215 s…

带你读懂什么是AI Agent智能体

一、智能体的定义与特性 定义&#xff1a;智能体是一个使用大语言模型&#xff08;LLM&#xff09;来决定应用程序控制流的系统。然而&#xff0c;智能体的定义并不唯一&#xff0c;不同人有不同的看法。Langchain的创始人Harrison Chase从技术角度给出了定义&#xff0c;但更…

docker-compose在阿里云服务器上部署https所踩的各种坑(已成功部署)

前言 购买服务器&#xff0c;申请域名&#xff0c;申请证书&#xff0c;下载nginx证书&#xff0c;这些操作我就不说了&#xff0c;百度一大把&#xff0c;我只说一下部署中碰到的问题 问题 我们是docker-compose上部署的后台前台环境&#xff0c;配置https证书&#xff0c;…

Qt_day3_信号槽

目录 信号槽 1. 概念 2. 函数原型 3. 连接方式 3.1 自带信号 → 自带槽 3.2 自带信号 → 自定义槽 3.3 自定义信号 4. 信号槽传参 5. 对应关系 5.1 一对多 5.2 多对一 信号槽 1. 概念 之前的程序界面只能看&#xff0c;不能交互&#xff0c;信号槽可以让界面进行人机…

lua入门教程:math

在Lua中&#xff0c;math库是一个非常重要的内置库&#xff0c;它提供了许多用于数学计算的函数。这些函数可以处理各种数学运算&#xff0c;包括基本的算术运算、三角函数、对数函数、随机数生成等。结合你之前提到的Lua中的数字遵循IEEE 754双精度浮点标准&#xff0c;我们可…

UI架构解说

UI&#xff08;用户界面&#xff0c;User Interface&#xff09; 是指用户与软件或硬件系统进行交互的界面。 它是用户与系统之间的桥梁&#xff0c;允许用户通过视觉元素、交互组件和反馈机制来操作和控制应用程序或设备。 UI 设计的目标是提供直观、易用和愉悦的用户体验&a…

《ElementPlus 与 ElementUI 差异集合》Icon 图标 More 差异说明

参考 《element plus 使用 icon 图标(两种方式)》使用 icon 升级 Vue2 升级 Vue3 项目时&#xff0c;遇到命名时的实心与空心点差异&#xff01; ElementUI&#xff1a; 实心是 el-icon-more空心是 el-icon-more-outline ElementPlus&#xff1a; 实心是 el-icon-more-fill…

RWKV 通过几万 token 的 CoT 解决几乎 100% 的数独问题(采用 29M 参数的小模型)

RWKV 做 CoT 特别有优势&#xff0c;因为推理速度和显存占用与上下文无关。即使做几百万 token 的 CoT 也不会变慢或额外消耗显存。 RWKV 社区开发者 Jellyfish042 训练了一个用于解决数独谜题的专用 RWKV 模型 Sudoku-RWKV &#xff0c;项目的训练代码和数据制作脚本均已开源…

上海市计算机学会竞赛平台2024年11月月赛丙组

题目描述 在一个棋盘上&#xff0c;有两颗棋子&#xff0c;一颗棋子在第 aa 行第 bb 列&#xff0c;另一个颗棋子在第 xx 行第 yy 列。 每一步&#xff0c;可以选择一个棋子沿行方向移动一个单位&#xff0c;或沿列方向移动一个单位&#xff0c;或同时沿行方向及列方向各移动…

【异质图学习】异构图神经网络中的自监督学习:基于语义强度和特征相似度的对比学习

【异质图学习】异构图神经网络中的自监督学习&#xff1a;基于语义强度和特征相似度的对比学习 简介&#xff1a;本文探讨了异构图神经网络中自监督学习的应用&#xff0c;特别是基于语义强度和特征相似度的对比学习技术。通过对比学习&#xff0c;模型能够从无标签数据中学习…

postgres+timescaledb--离线安装,centos7.9

操作系统是centos7.9&#xff0c;使用的hper-V,安装的虚拟机环境&#xff0c;安装好操作系统之后&#xff0c;让系统不连外网后直接按下方操作安装。 方式1&#xff0c;使用压缩包&#xff0c;复杂一点。&#xff08;第一种方式暂时没有安装timescaledb&#xff09; 装备安装…

MyBatis xml 文件中 SQL 语句的小于号未转义导致报错

问题现象 在 MyBatis 的 xml 文件中添加了一个 SQL 语句 <select id"countXxx" resultType"int">select count(*) from t1 where count < 3 </select>启动 Spring Boot 应用程序后报错&#xff1a; Caused by: org.apache.ibatis.builde…

深入剖析输入URL按下回车,浏览器做了什么

DNS 查询 首先,是去寻找页面资源的位置。如果导航到https://example.com, 假设HTML页面被定位到IP地址为93.184.216.34 的服务器。如果以前没有访问过这个网站&#xff0c;就需要进行DNS查询。 浏览器向域名服务器发起DNS查询请求&#xff0c;最终得到一个IP地址。第一次请求…

Hutool-Java工具库

日期时间 1、DateUtil 获取当前时间 import cn.hutool.core.date.DateUtil;import java.util.Calendar; import java.util.Date; //当前时间字符串&#xff0c;格式&#xff1a;yyyy-MM-dd HH:mm:ssDate date DateUtil.date();Date date2 DateUtil.date(Calendar.getInstan…

ceph介绍和搭建

1 为什么要使用ceph存储 什么是对象存储&#xff1f; 对象存储并没有向文件系统那样划分为元数据区域和数据区域&#xff0c;而是按照不同的对象进行存储&#xff0c;而且每个对象内部维护着元数据和数据区域。因此每个对象都有自己独立的管理格式。 对象存储优点&#xff1a…

BootStrap复选框多选,页面初始化选中处理

以isExecuted字段为例数据库设置为varchar类型 新增页面 <div class"row"><div class"col-sm-6"><div class"form-group"><label class"col-sm-4 control-label">部门协调&#xff1a;</label><di…

这些场景不适合用Selenium自动化!看看你踩过哪些坑?

Selenium是自动化测试中的一大主力工具&#xff0c;其强大的网页UI自动化能力&#xff0c;让测试人员可以轻松模拟用户操作并验证系统行为。然而&#xff0c;Selenium并非万能&#xff0c;尤其是在某些特定场景下&#xff0c;可能并不适合用来自动化测试。本文将介绍Selenium不…

AI大模型在尽职调查中的应用场景与客户案例

应用场景 1. 企业IPO尽职调查中的文档处理与合规审查 在券商投行进行企业IPO尽职调查过程中&#xff0c;企业需要提交大量的财务报表、历史沿革文件、法律合同等资料。这些文件涉及多个部门&#xff0c;往往存在信息分散、合规性复杂、数据量庞大等问题。思通数科的AI能力平…

react-router-dom 库作用

react-router-dom是一个用于在 React 应用中实现路由功能的重要库 一、实现页面导航 1. 声明式路由定义 1.1 基本原理 使用react-router-dom可以在代码中直接定义一个路由规则&#xff0c;如从/home路径导航到Home组件。 1.2 代码示例 Router 路由根容器&#xff0c;Rout…

[C++]内联函数和nullptr

> &#x1f343; 本系列为初阶C的内容&#xff0c;如果感兴趣&#xff0c;欢迎订阅&#x1f6a9; > &#x1f38a;个人主页:[小编的个人主页])小编的个人主页 > &#x1f380; &#x1f389;欢迎大家点赞&#x1f44d;收藏⭐文章 > ✌️ &#x1f91e; &#x1…