python 如何将dataframe数据批量插入到clickhouse

1. 背景-问题描述

最近做一个资产数据清洗工作,结束后需要将数据批量插入到clickhouse;查找并尝试了好几种批量插入的方式均已失败告终;现就查到的dataframe类型数据批量插入到clickhouse记录和汇总于此。

2. 配置信息

python 3.12
clickhouse_driver 0.2.9
PyYAML 6.0.2
pandas 2.2.3
numpy 2.1.2

3. 单条数据插入

from clickhouse_driver import Client
client = Client(host='host',port='port',user='user',password='password',settings={'use_numpy': True}
)
# 创建表格
client.execute("CREATE TABLE IF NOT EXISTS db.test_table (id Int32, name String) ENGINE = MergeTree")
sql = "INSERT INTO db.test_table (id, name) VALUES (1, 'test_1')"
client.execute(sql)

db.test_table: db表示数据库,test_table表示表名称

4. 多条数据插入

# 多条数据插入
sql = "INSERT INTO db.test_table (id, name) VALUES (1, 'test_1'), (2, 'test_2')"
client.execute(sql)

5. dataframe数据全部插入

5.1 将需要插入的数据写入到sql中

# 取出dataframe列
import pandas as pd
import numpy as np
from clickhouse_driver import Clientclient = Client(host='host',port='port',user='user',password='password',settings={'use_numpy': True}
)# 创建一个示例DataFrame
data = {'id': [1, 2, 3],'name': ['name_1', 'name_2', 'name_3']
}
df = pd.DataFrame(data)
cols = df.columns.tolist()
sql0 = f"INSERT INTO db.test_table ({','.join(map(str, cols))}) VALUES"
# 将df下的values转化为tuple格式: [(1, 'name_1'), (2, 'name_2'),(3, 'name_3')]
tuples = [tuple(x) for x in df.to_numpy()]
sql = f"{sql0}" + ', '.join(map(str, tuples))
client.execute(sql)

5.2 使用insert_dataframe

cols = df.columns.tolist()
query = f"INSERT INTO {table} ({','.join(map(str, cols))}) VALUES "
client.insert_dataframe(query, df)

6. dataframe数据分批插入

def get_insert_sql(df: pd.DataFrame)->str:cols = df.columns.tolist()query = f"INSERT INTO {self.__table} ({','.join(map(str, cols))}) VALUES "tuples = [tuple(x) for x in df.to_numpy()]sql = f"{query}" + ', '.join(map(str, tuples))return sql# 定义每批次插入的行数batch_size = 1000# 分批次插入数据for start in range(0, len(df), batch_size):end = min(start + batch_size, len(df))sql = get_insert_sql(df[start:end])client.insert(sql)

经实验:下列方式会失败

tuples = [(1, 'test1'), (2, 'test2')]
client.execute('INSERT INTO db.test (id, name) VALUES', tuples, settings={'max_insert_block_size': 10},columnar=True)

在这里插入图片描述
正确的使用方式是:

cols = df.columns.tolist()
query = f"INSERT INTO {table} ({','.join(map(str, cols))}) VALUES "
# 分批插入,每次插入10条记录
client.insert_dataframe(query, df,settings={'max_insert_block_size': 10})

7. 完整代码

from clickhouse_driver import Client
import pandas as pd
import numpy as np
from datetime import datetime, timedeltaclass Error(Exception):pass
class Connection(Error):pass
class NotExist(Error):pass# ck client
class ClickHose:__slots__ = ("__ck",)__ck: Clientdef __init__(self):try:self.__ck = Client(host='ck_host',port=12345,user='ck_user',password='ck_password',settings={'use_numpy': True})except Exception as e:raise Connection(e)def get_conn(self) -> Client:return self.__ckdef query(self, sql: str):return self.__ck.execute(sql)def insert(self, sql: str):return self.__ck.execute(sql)def insert_dataframe(self, sql: str, df: pd.DataFrame, external_tables=None, query_id=None,settings=None):return self.__ck.insert_dataframe(sql, df, external_tables, query_id, settings)def close(self):self.__ck.disconnect()# ck表
class Test1CK:__slots__ = ("__ck","__table")__ck: ClickHose__table: strdef __init__(self, ck: ClickHose):self.__ck = ckself.__table = "db.test1"if not self.check_table():raise NotExist(f"not exist table: {self.__table}")def check_table(self)->bool:try:table_exists = self.__ck.query(f"EXISTS TABLE {self.__table}")[0][0]return table_existsexcept Exception as e:raise Error(e)def get_bill_by_cloud(self, cloud: str, start_time: int, end_time: int)->list:query = f"select * from {self.__table} where (cloud = '{cloud}') AND (time > {start_time}) AND (time < {end_time})"try:data = self.__ck.query(query)return dataexcept Exception as e:# logger.error(f"get_bill from table: {self.__table} error: {e}")def insert_batch(self, month: str, df: pd.DataFrame):if df is None or len(df) < 1:returntry:# 定义每批次插入的行数batch_size = 1000query = self.get_insert_sql(df)# 分批次插入数据for start in range(0, len(df), batch_size):end = min(start + batch_size, len(df))tuples = [tuple(x) for x in df[start:end].to_numpy()]sql = f"{query}" + ', '.join(map(str, tuples))self.__ck.insert(sql)#logger.info(f"insert month: {month} data success")except Exception as e:#logger.error(f"insert_batch from table: {self.__table}, sql: {sql} error: {e}")def insert_dataframe(self, month: str, df: pd.DataFrame):if df is None or len(df) < 1:returntry:# 定义每批次插入的行数batch_size = 1000cols = df.columns.tolist()query = f"INSERT INTO {self.__table} ({','.join(map(str, cols))}) VALUES "self.__ck.insert_dataframe(query, df, settings={'max_insert_block_size': batch_size})#logger.info(f"insert month: {month} data success")except Exception as e:#logger.error(f"insert_batch from table: {self.__table}, sql: {query} error: {e}")def get_insert_sql(self, df: pd.DataFrame) -> str:cols = df.columns.tolist()query = f"INSERT INTO {self.__table} ({','.join(map(str, cols))}) VALUES "#logger.debug(f"get_insert_sql: {query}")return queryif __name__ == '__main__':ck_db = ClickHose()try:test1_table = Test1CK(ck_db)# 创建一个示例DataFramedata = {'id': [1, 2, 3],'name': ['name_1', 'name_2', 'name_3']}df = pd.DataFrame(data)test1_table.insert_batch("3", df)except Exception as e:#logger.error(f"error {e}")

参考资料:
python如何将一个dataframe快速写入clickhouse
python批量插入clickhouse

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/59666.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

DevOps业务价值流:架构设计最佳实践

系统设计阶段作为需求与研发之间的桥梁&#xff0c;在需求设计阶段的原型设计评审环节&#xff0c;尽管项目组人员可能未完全到齐&#xff0c;但关键角色必须到位&#xff0c;包括技术组长和测试组长。这一安排旨在同步推进两项核心任务&#xff1a;一是完成系统的架构设计&…

强化学习之课程学习法

作者名片 &#x1f935;‍♂️ 个人主页&#xff1a;抱抱宝 &#x1f604;微信公众号&#xff1a;宝宝数模AI ✍&#x1f3fb;作者简介&#xff1a;阿里云专家博主 | 持续分享机器学习、数学建模、数据分析、AI人工智能领域相关知识&#xff0c;和大家一起进步&#xff01; &am…

计算机网络:网络层 —— 网络地址转换 NAT

文章目录 网络地址转换 NAT 概述最基本的 NAT 方法NAT 转换表的作用 网络地址与端口号转换 NAPTNAT 和 NAPT 的缺陷 网络地址转换 NAT 概述 尽管因特网采用了无分类编址方法来减缓 IPv4 地址空间耗尽的速度&#xff0c;但由于因特网用户数量的急剧增长&#xff0c;特别是大量小…

leetcode 633. 平方数之和 中等

给定一个非负整数 c &#xff0c;你要判断是否存在两个整数 a 和 b&#xff0c;使得 a*ab*bc 。 示例 1&#xff1a; 输入&#xff1a;c 5 输出&#xff1a;true 解释&#xff1a;1 * 1 2 * 2 5示例 2&#xff1a; 输入&#xff1a;c 3 输出&#xff1a;false提示&#…

Qt第三课 ----------容器类控件

作者前言 &#x1f382; ✨✨✨✨✨✨&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f382; ​&#x1f382; 作者介绍&#xff1a; &#x1f382;&#x1f382; &#x1f382; &#x1f389;&#x1f389;&#x1f389…

运放进阶篇-多种波形可调信号发生器-产生方波-三角波-正弦波

引言&#xff1a;前几节我们已经说到硬件相关基础的电路&#xff0c;以及对于运放也讲到了初步的理解&#xff0c;特别是比较器的部分&#xff0c;但是放大器的部分我们对此并没有阐述&#xff0c;在这里通过实例进行理论结合实践的学习。而运放真正的核心&#xff0c;其实就是…

蓝桥杯c++算法学习【1】之枚举与模拟(卡片、回文日期、赢球票:::非常典型的比刷例题!!!)

别忘了请点个赞收藏关注支持一下博主喵&#xff01;&#xff01;&#xff01; 关注博主&#xff0c;更多蓝桥杯nice题目静待更新:) 枚举与模拟 一、卡片&#xff1a; 【问题描述】 …

使用Netty实现一个简单的聊天服务器

✅作者简介&#xff1a;热爱Java后端开发的一名学习者&#xff0c;大家可以跟我一起讨论各种问题喔。 &#x1f34e;个人主页&#xff1a;Hhzzy99 &#x1f34a;个人信条&#xff1a;坚持就是胜利&#xff01; &#x1f49e;当前专栏&#xff1a;Netty &#x1f96d;本文内容&a…

用discuz开发一款网站自动秒收录源码功能更强悍更安全

用discuz开发一款网站自动秒收录源码功能更强悍更安全 在当今数字化时代&#xff0c;网站的曝光率和访问量是衡量其成功与否的重要指标。为了提升网站的曝光率&#xff0c;自动秒收录技术应运而生。而Discuz平台作为知名的社区管理系统&#xff0c;为开发者提供了丰富的功能和…

绿色能源发展关键:优化风电运维体系

根据QYResearch调研团队最新发布的《全球风电运维市场报告2023-2029》显示&#xff0c;预计到2029年&#xff0c;全球风电运维市场的规模将攀升至307.8亿美元&#xff0c;并且在接下来的几年里&#xff0c;其年复合增长率&#xff08;CAGR&#xff09;将达到12.5%。 上述图表及…

ER图的规范画法

ER图相较其他结构化分析方法和面向对象分析方法的图结构来说&#xff0c;还是相对容易不少&#xff0c;不过各位也要注重细节的严谨性。 entity&#xff0c;实体&#xff1b;relationship&#xff0c;关系&#xff1b;相信各位的英语水平不难理解ER图的功能&#xff0c;椭圆代表…

【静态页面】尚品汇 1、设计稿分析及资源准备

目录 1. 准备工作2. 理解设计3. 规划项目结构 1. 准备工作 安装必要的工具&#xff1a;确保你的开发环境已经准备好&#xff0c;包括文本编辑器&#xff08;如 VSCode&#xff09;、浏览器等。获取设计文件&#xff1a;获取UI设计稿或者设计文件链接&#xff0c;并确保可以访问…

基于MPPT最大功率跟踪的光伏发电蓄电池控制系统simulink建模与仿真

目录 1.课题概述 2.系统仿真结果 3.核心程序与模型 4.系统原理简介 5.完整工程文件 1.课题概述 基于MPPT最大功率跟踪的光伏发电蓄电池控制系统simulink建模与仿真。本系统包括PV模块&#xff0c;电池模块&#xff0c;电池控制器模块&#xff0c;MPPT模块&#xff0c;PWM模…

nginx代理 proxy_pass

一、location 包含 location /api/ {proxy_pass http://127.0.0.1:85;} 二、location 不包含 location /api/ {proxy_pass http://127.0.0.1:85/;} 三、locaion 包含 location /api {proxy_pass http://127.0.0.1:85;}四、location 包含 location /api {proxy_pass http://127.…

MongoDB笔记01-概念与安装

文章目录 前言一、MongoDB相关概念1.1 业务应用场景具体的应用场景什么时候选择MongoDB 1.2 MongoDB简介1.3 体系结构1.4 数据模型1.5 MongoDB的特点 二、本地单机部署2.1 Windows系统中的安装启动第一步&#xff1a;下载安装包第二步&#xff1a;解压安装启动1.命令行参数方式…

IMS高压发生器维修高压电源维修XRG100/1000

IMS高压发生器的硬件组成&#xff1a; 高压控制发生器主要由高压发生器和高压控制器两部分组成。高压控制器是控制调节X射线管管电压和管电流的机构,高压发生器是管电压和管电流产生的执行机构,通过高压控制器对高压发生器进行控制调节,通过高压电缆将高压发生器与X射线管连接…

133.鸿蒙基础01

鸿蒙基础 1.自定义构建函数1. 构建函数-[Builder ](/Builder )2. 构建函数-传参传递(单向)3. 构建函数-传递参数(双向)4. 构建函数-传递参数练习5. 构建函数-[BuilderParam ](/BuilderParam ) 传递UI 2.组件状态共享1. 状态共享-父子单向2. 状态共享-父子双向3. 状态共享-后代组…

MybatisPlus入门(八)MybatisPlus-DQL编程控制

一、字段映射与表名映射 数据库表和实体类名称一样自动关联&#xff0c;数据库表和实体类有部分情况不一样。 问题一&#xff1a;表名与编码开发设计不同步&#xff0c;表名和实体类名称不一致。 解决办法&#xff1a; 在模型类上方&#xff0c;使用TableName注解&#xf…

RNN中的梯度消失与梯度爆炸问题

梯度消失与梯度爆炸问题 循环神经网络&#xff08;Recurrent Neural Network&#xff0c;RNN&#xff09;是一类具有短期记忆能力的神经网络&#xff0e;在循环神经网络中&#xff0c;神经元不但可以接受其他神经元的信息&#xff0c;也可以接受自身的信息&#xff0c;形成具有…

Trimble X12三维激光扫描仪正在改变游戏规则【上海沪敖3D】

Trimble X12 三维激光扫描仪凭借清晰、纯净的点云数据和亚毫米级的精度正在改变游戏规则。今天的案例我们将与您分享&#xff0c;X12是如何帮助专业测量咨询公司OR3D完成的一个模拟受损平转桥运动的项目。 由于习惯于以微米为单位工作&#xff0c;专业测量机构OR3D是一家要求…